[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46278355
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
+
+   @Test
+   public void testStop() throws Exception {
+   final JobVertexID jid = new JobVertexID();
+   final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+   Execution executionMock = mock(Execution.class);
+   
whenNew(Execution.class).withAnyArguments().thenReturn(executionMock);
+
+   final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new 
IntermediateResult[0],
+   AkkaUtils.getDefaultTimeout());
+
+   vertex.stop();
+
+   verify(executionMock).stop();
+   }
+
+   private static ActorSystem system;
+
+   @AfterClass
+   public static void teardown(){
+   if(system != null) {
+   JavaTestKit.shutdownActorSystem(system);
+   system = null;
+   }
+   }
+
+   static boolean receivedStopSignal;
--- End diff --

Usually it's common practice in Flink's code base to put the static fields 
at the very beginning of the class definition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46283023
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java
 ---
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.BaseTestingActorGateway;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.messages.Messages;
+import org.apache.flink.runtime.messages.TaskMessages;
+import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.junit.AfterClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import scala.concurrent.ExecutionContext;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.whenNew;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ExecutionVertex.class)
+public class ExecutionVertexStopTest {
+
+   @Test
+   public void testStop() throws Exception {
+   final JobVertexID jid = new JobVertexID();
+   final ExecutionJobVertex ejv = getExecutionVertex(jid);
+
+   Execution executionMock = mock(Execution.class);
+   
whenNew(Execution.class).withAnyArguments().thenReturn(executionMock);
+
+   final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new 
IntermediateResult[0],
+   AkkaUtils.getDefaultTimeout());
+
+   vertex.stop();
+
+   verify(executionMock).stop();
+   }
+
+   private static ActorSystem system;
+
+   @AfterClass
+   public static void teardown(){
+   if(system != null) {
+   JavaTestKit.shutdownActorSystem(system);
+   system = null;
+   }
+   }
+
+   static boolean receivedStopSignal;
--- End diff --

Ok. Will do. (for a test I thought is would be better to have it at the 
singlet test method where it is used).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46282619
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
+public class TaskStopTest {
+   private AbstractInvokable taskMock;
+   private Task task;
+
+   public void doMocking() throws Exception {
+   TaskDeploymentDescriptor tddMock = 
mock(TaskDeploymentDescriptor.class);
+   when(tddMock.getNumberOfSubtasks()).thenReturn(1);
+   when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+   when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+   
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+   when(tddMock.getTaskName()).thenReturn("taskName");
+   
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+   
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+   when(tddMock.getInvokableClassName()).thenReturn("className");
+
+   task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
+   mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
+   mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
+   mock(TaskManagerRuntimeInfo.class));
+   Field f = task.getClass().getDeclaredField("invokable");
+   f.setAccessible(true);
+   f.set(task, taskMock);
--- End diff --

Why do you use class fields to pass in parameters like `taskMock`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46299763
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
+public class TaskStopTest {
+   private AbstractInvokable taskMock;
+   private Task task;
+
+   public void doMocking() throws Exception {
+   TaskDeploymentDescriptor tddMock = 
mock(TaskDeploymentDescriptor.class);
+   when(tddMock.getNumberOfSubtasks()).thenReturn(1);
+   when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+   when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+   
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+   when(tddMock.getTaskName()).thenReturn("taskName");
+   
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+   
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+   when(tddMock.getInvokableClassName()).thenReturn("className");
+
+   task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
+   mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
+   mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
+   mock(TaskManagerRuntimeInfo.class));
+   Field f = task.getClass().getDeclaredField("invokable");
+   f.setAccessible(true);
+   f.set(task, taskMock);
--- End diff --

actually I was referring to `taskMock`. Why do you keep it as a class field 
and don't give it as a parameter to the `doMocking` method?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46300103
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

Of course we would have to avoid the race condition by sending a 
`NotifyWhenJobRemoved` message to the `JobManager` and awaiting its response.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46300384
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.taskmanager;
+
+import java.lang.reflect.Field;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobgraph.tasks.Stoppable;
+import org.apache.flink.runtime.memory.MemoryManager;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import scala.concurrent.duration.FiniteDuration;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, 
FiniteDuration.class })
+public class TaskStopTest {
+   private AbstractInvokable taskMock;
+   private Task task;
+
+   public void doMocking() throws Exception {
+   TaskDeploymentDescriptor tddMock = 
mock(TaskDeploymentDescriptor.class);
+   when(tddMock.getNumberOfSubtasks()).thenReturn(1);
+   when(tddMock.getJobID()).thenReturn(mock(JobID.class));
+   when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class));
+   
when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class));
+   when(tddMock.getTaskName()).thenReturn("taskName");
+   
when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class));
+   
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
+   when(tddMock.getInvokableClassName()).thenReturn("className");
+
+   task = new Task(tddMock, mock(MemoryManager.class), 
mock(IOManager.class), mock(NetworkEnvironment.class),
+   mock(BroadcastVariableManager.class), 
mock(ActorGateway.class), mock(ActorGateway.class),
+   mock(FiniteDuration.class), 
mock(LibraryCacheManager.class), mock(FileCache.class),
+   mock(TaskManagerRuntimeInfo.class));
+   Field f = task.getClass().getDeclaredField("invokable");
+   f.setAccessible(true);
+   f.set(task, taskMock);
--- End diff --

Now I understand... No specific reason. Just my code style. Will change it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-12-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46300626
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 ---
@@ -227,4 +235,121 @@ public void testRequestPartitionState() throws 
Exception {
}
}};
}
+
+   @Test
+   public void testStopSignal() throws Exception {
+   new JavaTestKit(system) {{
+   // Setup
+   TestingCluster cluster = null;
+
+   try {
+   cluster = startTestingCluster(2, 1, 
DEFAULT_AKKA_ASK_TIMEOUT());
+
+   // Create a task
+   final JobVertex sender = new 
JobVertex("Sender");
+   sender.setParallelism(2);
+   
sender.setInvokableClass(StoppableInvokable.class);
+
+   final JobGraph jobGraph = new 
JobGraph("Blocking test job", JobType.STREAMING, sender);
+   final JobID jid = jobGraph.getJobID();
+
+   final ActorGateway jobManagerGateway = 
cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+
+   // we can set the leader session ID to None 
because we don't use this gateway to send messages
+   final ActorGateway testActorGateway = new 
AkkaActorGateway(getTestActor(), null);
+
+   // Submit the job and wait for all vertices to 
be running
+   jobManagerGateway.tell(
+   new SubmitJob(
+   jobGraph,
+   
ListeningBehaviour.EXECUTION_RESULT),
+   testActorGateway);
+   
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+   jobManagerGateway.tell(new StopJob(jid), 
testActorGateway);
+
+   // - The test 
--
+   expectMsgClass(StoppingSuccess.class);
--- End diff --

I see. Can do that (I am not aware of all messages that are available ;))


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46205018
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
}
 
/**
+* Executes the STOP action.
+* 
+* @param args Command line arguments for the stop action.
+*/
+   protected int stop(String[] args) {
+   LOG.info("Running 'stop' command.");
+
+   StopOptions options;
+   try {
+   options = CliFrontendParser.parseStopCommand(args);
+   }
+   catch (CliArgsException e) {
+   return handleArgException(e);
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
+
+   // evaluate help flag
+   if (options.isPrintHelp()) {
+   CliFrontendParser.printHelpForStop();
+   return 0;
+   }
+
+   String[] stopArgs = options.getArgs();
+   JobID jobId;
+
+   if (stopArgs.length > 0) {
+   String jobIdString = stopArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   }
+   catch (Exception e) {
+   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
+   System.out.println("Error: The value for the 
Job ID is not a valid ID.");
+   return 1;
+   }
+   }
+   else {
+   LOG.error("Missing JobID in the command line 
arguments.");
--- End diff --

See `cancel` -- same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46204977
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
}
 
/**
+* Executes the STOP action.
+* 
+* @param args Command line arguments for the stop action.
+*/
+   protected int stop(String[] args) {
+   LOG.info("Running 'stop' command.");
+
+   StopOptions options;
+   try {
+   options = CliFrontendParser.parseStopCommand(args);
+   }
+   catch (CliArgsException e) {
+   return handleArgException(e);
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
+
+   // evaluate help flag
+   if (options.isPrintHelp()) {
+   CliFrontendParser.printHelpForStop();
+   return 0;
+   }
+
+   String[] stopArgs = options.getArgs();
+   JobID jobId;
+
+   if (stopArgs.length > 0) {
+   String jobIdString = stopArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   }
+   catch (Exception e) {
+   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
--- End diff --

We could. But `cancel` does not use it either. I guess we should have 
consistent patterns... I would just keep it as is. If you want to get it 
changed, we should changed it in `cancel`, too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-160779157
  
Same failing test as before. We forgot to address this: "I just 
investigated the failing test. The test is new and passed up to know. @uce 
changed ExecutionGraph.cancel() recently, allowing to cancel a job when it is 
in state "FAILING". I am not sure if this is a good solution -- I would remove 
the state transition from FAILING to CANCELLING. What do you think?"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46205386
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import java.util.UUID;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+import static 
org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest {
--- End diff --

You might remember that I reworked a coupled of tests in this PR that I 
"removed" to keep this PR clean. I still have this in a local branch -- and it 
contains this change -- together with the same change to the other test classes 
in this package. I would keep it as is here, and apply the changes in a follow 
up commit. ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-160774595
  
Updated this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46205848
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

I guess this are not dependencies but a list of used tools... As everybody 
has a slightly different setup, this contains a lot of changes -- but I guess 
it not relevant. @sachingoel0101 can you comment on this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46205569
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import java.util.UUID;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+import static 
org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest {
+
+   private static ActorSystem actorSystem;
+
+   @BeforeClass
+   public static void setup() {
+   pipeSystemOutToNull();
+   actorSystem = ActorSystem.create("TestingActorSystem");
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(actorSystem);
+   actorSystem = null;
+   }
+
+   @BeforeClass
+   public static void init() {
--- End diff --

Inspired by `CliFrontendListCancelTest` -- it does the same. Not sure to be 
honest. Does it make a difference? If I change it, I would make sense to change 
in both... Let me know what you think about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46206488
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -402,21 +405,53 @@ public void onComplete(Throwable failure, Object 
success) throws Throwable {
}
}
 
+   public void stop() {
+   // sends stop RPC call
+
+   final SimpleSlot slot = this.assignedResource;
+
+   if (slot != null) {
+   final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+
+   Future stopResult = gateway.retry(
+   new StopTask(attemptId),
+   NUM_STOP_CALL_TRIES,
+   timeout,
+   executionContext);
+
+   stopResult.onComplete(new OnComplete() {
+   @Override
+   public void onComplete(Throwable failure, 
Object success) throws Throwable {
+   if (failure != null) {
+   fail(new Exception("Task could 
not be stopped.", failure));
+   } else {
+   TaskOperationResult result = 
(TaskOperationResult) success;
+   if (!result.success()) {
+   LOG.debug("Stopping 
task call did not find task. Probably akka message call race.");
--- End diff --

Same pattern as `cancel` `onComplete` back-call (see line 901) -- I we 
change it, we should change both. I would keep it. If you want to get it 
changed for both, let me know.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46171068
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import java.util.UUID;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+import static 
org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest {
+
+   private static ActorSystem actorSystem;
+
+   @BeforeClass
+   public static void setup() {
+   pipeSystemOutToNull();
+   actorSystem = ActorSystem.create("TestingActorSystem");
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(actorSystem);
+   actorSystem = null;
+   }
+
+   @BeforeClass
+   public static void init() {
+   CliFrontendTestUtils.pipeSystemOutToNull();
+   CliFrontendTestUtils.clearGlobalConfiguration();
+   }
+
+   @Test
+   public void testStop() {
+   try {
+   // test unrecognized option
+   {
+   String[] parameters = { "-v", "-l" };
+   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   int retCode = testFrontend.stop(parameters);
+   assertTrue(retCode != 0);
+   }
+
+   // test missing job id
+   {
+   String[] parameters = {};
+   CliFrontend testFrontend = new 
CliFrontend(CliFrontendTestUtils.getConfigDir());
+   int retCode = testFrontend.stop(parameters);
+   assertTrue(retCode != 0);
+   }
+
+   // test stop properly
+   {
+   JobID jid = new JobID();
+   String jidString = jid.toString();
+
+   final Option leaderSessionID = 
Option.apply(UUID.randomUUID());
+   final ActorRef jm = 
actorSystem.actorOf(Props.create(CliJobManager.class, jid));
+
+   final ActorGateway gateway = new 
AkkaActorGateway(jm, leaderSessionID);
+
+   String[] parameters = { jidString };
+   StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(gateway);
+
+   int retCode = testFrontend.stop(parameters);
+   assertTrue(retCode == 0);
+   }
+
+   // test stop properly
+   {
+   JobID jid1 = new JobID();
+   JobID jid2 = new JobID();
+
+   final Option leaderSessionID = 
Option.apply(UUID.randomUUID());
+   final ActorRef jm = 
actorSystem.actorOf(Props.create(CliJobManager.class, jid1));
+
+   final ActorGateway gateway = new 
AkkaActorGateway(jm, leaderSessionID);
+
+   String[] parameters = { jid2.toString() };
+   StopTestCliFrontend testFrontend = new 
StopTestCliFrontend(gateway);
  

[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46171614
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -188,9 +189,12 @@ public WebRuntimeMonitor(
 
// Cancel a job via GET (for proper integration with 
YARN this has to be performed via GET)
.GET("/jobs/:jobid/yarn-cancel", handler(new 
JobCancellationHandler()))
-   // DELETE is the preferred way of cancelling a job 
(Rest-conform)
+   // DELETE is the preferred way of canceling a job 
(Rest-conform)
.DELETE("/jobs/:jobid", handler(new 
JobCancellationHandler()))
 
+   // stop a job
+   .DELETE("/jobs/:jobid/stop", handler(new 
JobStoppingHandler()))
+
--- End diff --

What's the state of resolving this issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46172044
  
--- Diff: 
flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee ---
@@ -216,4 +216,7 @@ angular.module('flinkApp')
 # proper "DELETE jobs//"
 $http.get "jobs/" + jobid + "/yarn-cancel"
 
+  @stopJob = (jobid) ->
+$http.delete "jobs/" + jobid + "/stop"
--- End diff --

I think, it's quite important to support the stop call for Yarn setups as 
well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46173267
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -1098,8 +1132,8 @@ public void registerExecutionListener(ActorGateway 
listener) {
this.executionListenerActors.add(listener);
}
}
-   
-   
+
+
--- End diff --

Two line linebreak.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46172301
  
--- Diff: flink-runtime-web/web-dashboard/package.json ---
@@ -7,27 +7,27 @@
   "devDependencies": {
 "browserify": "^9.0.3",
 "coffeeify": "^1.0.0",
-"gulp": "^3.8.11",
+"gulp": "^3.9.0",
 "gulp-browserify": "^0.5.1",
 "gulp-coffee": "^2.3.1",
 "gulp-coffeeify": "^0.1.8",
-"gulp-concat": "^2.5.2",
-"gulp-filter": "^2.0.2",
-"gulp-jade": "^1.0.0",
-"gulp-less": "^3.0.2",
-"gulp-livereload": "^3.8.0",
-"gulp-minify-css": "^1.0.0",
-"gulp-ng-annotate": "^0.5.2",
-"gulp-plumber": "^1.0.0",
+"gulp-concat": "^2.6.0",
+"gulp-filter": "^3.0.1",
+"gulp-jade": "^1.1.0",
+"gulp-less": "^3.0.5",
+"gulp-livereload": "^3.8.1",
+"gulp-minify-css": "^1.2.1",
+"gulp-ng-annotate": "^1.1.0",
+"gulp-plumber": "^1.0.1",
 "gulp-rename": "^1.2.0",
-"gulp-serve": "^0.3.1",
-"gulp-sourcemaps": "^1.5.1",
-"gulp-stylus": "^2.0.1",
-"gulp-uglify": "^1.1.0",
-"gulp-util": "^3.0.4",
+"gulp-serve": "^1.2.0",
+"gulp-sourcemaps": "^1.6.0",
+"gulp-stylus": "^2.1.0",
+"gulp-uglify": "^1.5.1",
+"gulp-util": "^3.0.7",
 "jade": "^1.9.2",
 "jadeify": "^4.1.0",
-"main-bower-files": "^2.6.2",
+"main-bower-files": "^2.9.0",
--- End diff --

Just out of curiosity, why do we change the versions of so many web 
dependencies?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46173139
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 ---
@@ -254,13 +262,14 @@ public ExecutionGraph(
ExecutionContext executionContext,
JobID jobId,
String jobName,
+   JobType jobType,
Configuration jobConfig,
FiniteDuration timeout,
List requiredJarFiles,
List requiredClasspaths,
ClassLoader userClassLoader) {
 
-   if (executionContext == null || jobId == null || jobName == 
null || jobConfig == null || userClassLoader == null) {
+   if (executionContext == null || jobId == null || jobName == 
null || jobType == null || jobConfig == null || userClassLoader == null) {
throw new NullPointerException();
--- End diff --

I think it would be good to know which of the fields was actually `null`. 
We could use

```
executionContext = Preconditions.checkNotNull(executionContext);
...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46174663
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
 ---
@@ -59,6 +59,15 @@ object TaskMessages {
 extends TaskMessage with RequiresLeaderSessionID
 
   /**
+   * Stops the task associated with [[attemptID]]. The result is sent back 
to the sender as a
+   * [[TaskOperationResult]] message.
+   *
+   * @param attemptID The task's execution attempt ID.
+   */
+  case class StopTask(attemptID: ExecutionAttemptID)
+extends TaskMessage
--- End diff --

This class should also extend `RequiresLeaderSessionID`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46172807
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -402,21 +405,53 @@ public void onComplete(Throwable failure, Object 
success) throws Throwable {
}
}
 
+   public void stop() {
+   // sends stop RPC call
+
+   final SimpleSlot slot = this.assignedResource;
+
+   if (slot != null) {
+   final ActorGateway gateway = 
slot.getInstance().getActorGateway();
+
+   Future stopResult = gateway.retry(
+   new StopTask(attemptId),
+   NUM_STOP_CALL_TRIES,
+   timeout,
+   executionContext);
+
+   stopResult.onComplete(new OnComplete() {
+   @Override
+   public void onComplete(Throwable failure, 
Object success) throws Throwable {
+   if (failure != null) {
+   fail(new Exception("Task could 
not be stopped.", failure));
+   } else {
+   TaskOperationResult result = 
(TaskOperationResult) success;
+   if (!result.success()) {
+   LOG.debug("Stopping 
task call did not find task. Probably akka message call race.");
--- End diff --

Also on `INFO` level.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46174475
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 ---
@@ -95,6 +95,14 @@ object JobManagerMessages {
   case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID
 
   /**
+   * Stops a (streaming) job with the given [[jobID]] at the JobManager. 
The result of
+   * stopping is sent back to the sender as a [[StoppingResponse]] message.
+   *
+   * @param jobID
+   */
+  case class StopJob(jobID: JobID)
--- End diff --

`StopJob` should extend `RequiresLeaderSessionID` in order to cope with 
changing leaders.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46159742
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

Is this still on your TODO list @rmetzger?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46161133
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
}
 
/**
+* Executes the STOP action.
+* 
+* @param args Command line arguments for the stop action.
+*/
+   protected int stop(String[] args) {
+   LOG.info("Running 'stop' command.");
+
+   StopOptions options;
+   try {
+   options = CliFrontendParser.parseStopCommand(args);
+   }
+   catch (CliArgsException e) {
+   return handleArgException(e);
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
+
+   // evaluate help flag
+   if (options.isPrintHelp()) {
+   CliFrontendParser.printHelpForStop();
+   return 0;
+   }
+
+   String[] stopArgs = options.getArgs();
+   JobID jobId;
+
+   if (stopArgs.length > 0) {
+   String jobIdString = stopArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   }
+   catch (Exception e) {
+   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
+   System.out.println("Error: The value for the 
Job ID is not a valid ID.");
+   return 1;
+   }
+   }
+   else {
+   LOG.error("Missing JobID in the command line 
arguments.");
--- End diff --

Maybe we can use `handleArgException` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46160278
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

Yes, but its not on the first page anymore ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46160961
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
}
 
/**
+* Executes the STOP action.
+* 
+* @param args Command line arguments for the stop action.
+*/
+   protected int stop(String[] args) {
+   LOG.info("Running 'stop' command.");
+
+   StopOptions options;
+   try {
+   options = CliFrontendParser.parseStopCommand(args);
+   }
+   catch (CliArgsException e) {
+   return handleArgException(e);
+   }
+   catch (Throwable t) {
+   return handleError(t);
+   }
+
+   // evaluate help flag
+   if (options.isPrintHelp()) {
+   CliFrontendParser.printHelpForStop();
+   return 0;
+   }
+
+   String[] stopArgs = options.getArgs();
+   JobID jobId;
+
+   if (stopArgs.length > 0) {
+   String jobIdString = stopArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   }
+   catch (Exception e) {
+   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
--- End diff --

Maybe we can use the `handleError` method here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46162472
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import java.util.UUID;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+import static 
org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest {
+
+   private static ActorSystem actorSystem;
+
+   @BeforeClass
+   public static void setup() {
+   pipeSystemOutToNull();
+   actorSystem = ActorSystem.create("TestingActorSystem");
+   }
+
+   @AfterClass
+   public static void teardown() {
+   JavaTestKit.shutdownActorSystem(actorSystem);
+   actorSystem = null;
+   }
+
+   @BeforeClass
+   public static void init() {
--- End diff --

Why do you have 2 `@BeforeClass` methods here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46162322
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java ---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client;
+
+import java.util.UUID;
+
+import akka.actor.*;
+import akka.testkit.JavaTestKit;
+
+import org.apache.flink.client.cli.CommandLineOptions;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import scala.Option;
+import static 
org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull;
+import static org.junit.Assert.*;
+
+public class CliFrontendStopTest {
--- End diff --

`extends TestLogger`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r46160903
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

Alright, but for the moment it would ok to merge it like it is here, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-27 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-160100311
  
I'd volunteer as a shepherd of this PR.

@mjsax I'll review your PR at the latest next week. Sorry for the long 
period of inactivity from my side.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45640243
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade ---
@@ -43,6 +43,10 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
   | Cancel
 
+  .navbar-info.last.first(ng-if!="job.type=='STREAMING' && 
(job.state=='RUNNING' || job.state=='CREATED')")
--- End diff --

Not sure exactly any more... I just tried to reproduce my search... I guess 
it was one of this:
http://jade-lang.com/reference/code/
https://stackoverflow.com/questions/6926247/nodejs-jade-escape-markup


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45640613
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade ---
@@ -43,6 +43,10 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
   | Cancel
 
+  .navbar-info.last.first(ng-if!="job.type=='STREAMING' && 
(job.state=='RUNNING' || job.state=='CREATED')")
--- End diff --

Ah. That helps. Thanks. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45639014
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade ---
@@ -43,6 +43,10 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
   | Cancel
 
+  .navbar-info.last.first(ng-if!="job.type=='STREAMING' && 
(job.state=='RUNNING' || job.state=='CREATED')")
--- End diff --

Yes. Quite odd indeed. Can you point out the resource where you saw this? 
I'm a little curious.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45639008
  
--- Diff: flink-runtime-web/web-dashboard/web/partials/jobs/job.html ---
@@ -32,7 +32,7 @@
   - 
   {{ job['end-time'] | amDateFormat:'-MM-DD, H:mm:ss' 
}}
   {{job.duration | 
humanizeDuration:true}}
-  Cancel
+  Cancel
--- End diff --

You are right... That is an old version. I forgot to commit this file. This 
will be
```
Cancel
Stop
```
Just adding the second line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-158912414
  
Just rebased. Can you please review @StephanEwen @tillrohrmann 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-158913013
  
@sachingoel0101 Is the UI change ok?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-158929824
  
I just investigated the failing test. The test is new and passed up to 
know. @uce changed `ExecutionGraph.cancel()` recently, allowing to cancel a job 
when it is in state "FAILING". I am not sure if this is a good solution -- I 
would remove the state transition from FAILING to CANCELLING. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45634054
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -188,9 +189,12 @@ public WebRuntimeMonitor(
 
// Cancel a job via GET (for proper integration with 
YARN this has to be performed via GET)
.GET("/jobs/:jobid/yarn-cancel", handler(new 
JobCancellationHandler()))
-   // DELETE is the preferred way of cancelling a job 
(Rest-conform)
+   // DELETE is the preferred way of canceling a job 
(Rest-conform)
.DELETE("/jobs/:jobid", handler(new 
JobCancellationHandler()))
 
+   // stop a job
+   .DELETE("/jobs/:jobid/stop", handler(new 
JobStoppingHandler()))
+
--- End diff --

This is somewhat counter-intuitive. `DELETE` is meant to remove the 
resource at the specified URI. Specifying a action as `/stop` is not a good 
idea. Perhaps a better idea would be to identify these actions as 
`/jobs/:jobid?mode=cancel` and `/jobs/:jobid?mode=stop`. @StephanEwen might 
have a better idea about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45635435
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -188,9 +189,12 @@ public WebRuntimeMonitor(
 
// Cancel a job via GET (for proper integration with 
YARN this has to be performed via GET)
.GET("/jobs/:jobid/yarn-cancel", handler(new 
JobCancellationHandler()))
-   // DELETE is the preferred way of cancelling a job 
(Rest-conform)
+   // DELETE is the preferred way of canceling a job 
(Rest-conform)
.DELETE("/jobs/:jobid", handler(new 
JobCancellationHandler()))
 
+   // stop a job
+   .DELETE("/jobs/:jobid/stop", handler(new 
JobStoppingHandler()))
+
--- End diff --

Will be changed to `GET` anyway. Would it be still an issue?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45635941
  
--- Diff: flink-runtime-web/web-dashboard/web/partials/jobs/job.html ---
@@ -32,7 +32,7 @@
   - 
   {{ job['end-time'] | amDateFormat:'-MM-DD, H:mm:ss' 
}}
   {{job.duration | 
humanizeDuration:true}}
-  Cancel
+  Cancel
--- End diff --

This is weird. The stop button code isn't actually here. Are you sure this 
file is up-to-date? Can you run gulp again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45636371
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -188,9 +189,12 @@ public WebRuntimeMonitor(
 
// Cancel a job via GET (for proper integration with 
YARN this has to be performed via GET)
.GET("/jobs/:jobid/yarn-cancel", handler(new 
JobCancellationHandler()))
-   // DELETE is the preferred way of cancelling a job 
(Rest-conform)
+   // DELETE is the preferred way of canceling a job 
(Rest-conform)
.DELETE("/jobs/:jobid", handler(new 
JobCancellationHandler()))
 
+   // stop a job
+   .DELETE("/jobs/:jobid/stop", handler(new 
JobStoppingHandler()))
+
--- End diff --

`DELETE` is more conforming to the REST framework. `GET` is just a 
workaround to support yarn when working on the AM proxy page. Yarn doesn't 
transfer anything besides `GET` to the AM's http server.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45634227
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade ---
@@ -43,6 +43,10 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
   | Cancel
 
+  .navbar-info.last.first(ng-if!="job.type=='STREAMING' && 
(job.state=='RUNNING' || job.state=='CREATED')")
--- End diff --

Are you sure this works? :confused: 
The `!=` shouldn't be there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45634353
  
--- Diff: 
flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee ---
@@ -216,4 +216,7 @@ angular.module('flinkApp')
 # proper "DELETE jobs//"
 $http.get "jobs/" + jobid + "/yarn-cancel"
 
+  @stopJob = (jobid) ->
+$http.delete "jobs/" + jobid + "/stop"
--- End diff --

As I commented before, this should rather be a `GET` request at 
`/yarn-stop` to support Yarn cancellations.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45635078
  
--- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade ---
@@ -43,6 +43,10 @@ 
nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job")
 span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)")
   | Cancel
 
+  .navbar-info.last.first(ng-if!="job.type=='STREAMING' && 
(job.state=='RUNNING' || job.state=='CREATED')")
--- End diff --

The syntax is quite odd, but it is correct (I figures this out on the 
Internet). It is necessary to protect `&&` from being changed to HTML 
`` during compilation. I also tested it: STOP button is only 
displayed for streaming jobs and if I click it the stop shuts down gracefully.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-23 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r45635645
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 ---
@@ -188,9 +189,12 @@ public WebRuntimeMonitor(
 
// Cancel a job via GET (for proper integration with 
YARN this has to be performed via GET)
.GET("/jobs/:jobid/yarn-cancel", handler(new 
JobCancellationHandler()))
-   // DELETE is the preferred way of cancelling a job 
(Rest-conform)
+   // DELETE is the preferred way of canceling a job 
(Rest-conform)
.DELETE("/jobs/:jobid", handler(new 
JobCancellationHandler()))
 
+   // stop a job
+   .DELETE("/jobs/:jobid/stop", handler(new 
JobStoppingHandler()))
+
--- End diff --

Why is there actually `GET` and `DELETE` for `cancel`? Why not just `GET`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-19 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-158050729
  
Opened a new JIRA for the failing build. Seems to be a instable test. 
(Build passed on my personla travis: 
https://travis-ci.org/mjsax/flink/builds/91941028) Please review this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-18 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157817216
  
Hi @mjsax , your change to coffeescript aren't ported to the actual 
javascript. It appears you forgot to run `gulp`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-18 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157832022
  
Thanks. Great. It works now. I was not aware the I have to build the UI 
manually. Would it be possible to integrate it into maven build? At least we 
should provide a README how to build it (or better a script to run). What do 
you think about it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-18 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157833952
  
There is a readme: 
https://github.com/apache/flink/tree/master/flink-runtime-web


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-18 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157834943
  
How could I miss this...?? Sorry. My own fault...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-18 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157712835
  
@sachingoel0101 I have a problem testing my changes to the WebUI. It seems 
that the browser does not load the correct (ie, changed) scripts... Check out 
the screenshot -- it only shows the unmodified `@cancelJob = (jobid) -> 
$http.delete "jobs/" + jobid` and the inserted `stop` part is missing (I have 
almost no experience in web-dev stuff).

I built the hole Flink project with `mvn -DskipTests clean install`; also 
delete maven repo to get rid of old stuff. I tried to reload (F5 and ctrl+F5 as 
well as clearing the cache; using a private window did not help either).

Can you help me out? The only change I can see it the new "stop" button -- 
so `jobs.html` is loaded correctly.

![screenshot - 11182015 - 02 20 59 
pm](https://cloud.githubusercontent.com/assets/8959638/11242126/1959dfc4-8e00-11e5-9840-127174ffabc3.png)



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-18 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157901595
  
Stop in WebUI is now working. After rebase, I realized that cancel was 
changed to GET for yarn. Do we need to implement stop the same way for yarn 
@mxm ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-17 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157442077
  
I just rebased this. Due to the new JobManager WebUI, I added a second 
commit to add STOP button to WebUI (which this does not work yet). I will 
squash both commits later on. I just pushed this rebase, such that the actual 
changes which are independent from the WebUI can get reviewed already.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-17 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157521508
  
I am confused why the build failed... It worked locally **and** on my 
personal travis: https://travis-ci.org/mjsax/flink/builds/91638981

Furthermore, if I understand the error message correctly, it complains that 
`MyErroneousTimestampSource` does not implement method `stop()` -- however, the 
code shows clearly that the method got added... (line 587, 588).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-11-17 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-157529908
  
Travis is caching the contents of the .m2 directory. Maybe that caused some 
weird issues.

I also had some travis issues which let tests fail with some very weird 
issues. If the issue persists with the next commit, we can try some stuff.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-10-20 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-149580271
  
Is there any news here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-27 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-143539717
  
Done. I split the PR and squashed the commit. @StephanEwen please let me 
know if it is ok.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r40334026
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -452,6 +453,73 @@ public void cancel(JobID jobId) throws Exception {
}
}
 
+   /**
+* Stops a program on Flink cluster whose job-manager is configured in 
this client's configuration.
+* Stopping works only for streaming programs. Be aware, that the 
program might continue to run for
+* a while after sending the stop command, because after sources 
stopped to emit data all operators
+* need to finish processing.
+* 
+* @param jobId
+*the job ID of the streaming program to stop
+* @throws ProgramStopException
+* If the job ID is invalid (ie, is unknown or refers to a 
batch job) or if sending the stop signal
+* failed. That might be due to an I/O problem, ie, the 
job-manager is unreachable.
+*/
+   public void stop(JobID jobId) throws Exception {
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+   final FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
+
+   ActorSystem actorSystem;
+   try {
+   actorSystem = 
JobClient.startJobClientActorSystem(configuration);
+   } catch (Exception e) {
+   throw new ProgramStopException("Could not start client 
actor system.", e);
+   }
+
+   try {
+   ActorGateway jobManagerGateway;
+
+   LeaderRetrievalService leaderRetrievalService;
+
+   try {
+   leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+   } catch (Exception e) {
+   throw new ProgramInvocationException("Could not 
create the leader retrieval service.", e);
+   }
+
+   try {
+   jobManagerGateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
+   leaderRetrievalService,
+   actorSystem,
+   lookupTimeout);
+   } catch (LeaderRetrievalException e) {
+   throw new ProgramInvocationException("Failed to 
retrieve JobManager gateway", e);
+   }
+
+   Future response;
+   try {
+   response = jobManagerGateway.ask(new 
StopJob(jobId), timeout);
+   } catch (Exception e) {
+   throw new ProgramInvocationException("Failed to 
query the job manager gateway.", e);
+   }
+
+   Object result = Await.result(response, timeout);
+
+   if (result instanceof 
JobManagerMessages.StoppingSuccess) {
+   LOG.debug("Job stopping with ID " + jobId + " 
succeeded.");
+   } else if (result instanceof 
JobManagerMessages.StoppingFailure) {
+   Throwable t = 
((JobManagerMessages.StoppingFailure) result).cause();
+   LOG.debug("Job stopping with ID " + jobId + " 
failed.", t);
--- End diff --

This should be logged on INFO level...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r40333992
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/Client.java ---
@@ -452,6 +453,73 @@ public void cancel(JobID jobId) throws Exception {
}
}
 
+   /**
+* Stops a program on Flink cluster whose job-manager is configured in 
this client's configuration.
+* Stopping works only for streaming programs. Be aware, that the 
program might continue to run for
+* a while after sending the stop command, because after sources 
stopped to emit data all operators
+* need to finish processing.
+* 
+* @param jobId
+*the job ID of the streaming program to stop
+* @throws ProgramStopException
+* If the job ID is invalid (ie, is unknown or refers to a 
batch job) or if sending the stop signal
+* failed. That might be due to an I/O problem, ie, the 
job-manager is unreachable.
+*/
+   public void stop(JobID jobId) throws Exception {
+   final FiniteDuration timeout = 
AkkaUtils.getTimeout(configuration);
+   final FiniteDuration lookupTimeout = 
AkkaUtils.getLookupTimeout(configuration);
+
+   ActorSystem actorSystem;
+   try {
+   actorSystem = 
JobClient.startJobClientActorSystem(configuration);
+   } catch (Exception e) {
+   throw new ProgramStopException("Could not start client 
actor system.", e);
+   }
+
+   try {
+   ActorGateway jobManagerGateway;
+
+   LeaderRetrievalService leaderRetrievalService;
+
+   try {
+   leaderRetrievalService = 
LeaderRetrievalUtils.createLeaderRetrievalService(configuration);
+   } catch (Exception e) {
+   throw new ProgramInvocationException("Could not 
create the leader retrieval service.", e);
+   }
+
+   try {
+   jobManagerGateway = 
LeaderRetrievalUtils.retrieveLeaderGateway(
+   leaderRetrievalService,
+   actorSystem,
+   lookupTimeout);
+   } catch (LeaderRetrievalException e) {
+   throw new ProgramInvocationException("Failed to 
retrieve JobManager gateway", e);
+   }
+
+   Future response;
+   try {
+   response = jobManagerGateway.ask(new 
StopJob(jobId), timeout);
+   } catch (Exception e) {
+   throw new ProgramInvocationException("Failed to 
query the job manager gateway.", e);
+   }
+
+   Object result = Await.result(response, timeout);
+
+   if (result instanceof 
JobManagerMessages.StoppingSuccess) {
+   LOG.debug("Job stopping with ID " + jobId + " 
succeeded.");
--- End diff --

This should be logged on INFO level...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142970422
  
There are some crucial issues still with this pull request:

Most importantly, it changes parts that it needs not change, altering 
system behavior. That manifests itself in the 130+ changed files, for something 
that should probably touch around 20 files.

As per the contribution guidelines, it is very important that pull requests 
do not do that. We have seen too many cases where bugs were introduced because 
code parts were changed for no other reason than someone thinking that the code 
should look different in their opinion.

Reporting and changing such unrelated parts should always be a separate 
issue.

In the first few files, I see examples of
  - Changing the type of exceptions caught. We often explicitly catch 
Throwables, because we want to handle Errors as well, such as LinkageErrors 
which are common when the dynamic class loading fails.
  - Changing of the way tests deal with exceptions. There are good reasons 
to explicitly catch and check expected exceptions (check messages, safe against 
maven mis-configurations where exceptions propagating out of tests were not 
correctly recognized, I saw that more than once so far).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142971028
  
What would really help to review this is a summary of how it actually 
works. We currently have to piece that together from the code, which takes 
quite some time...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142974060
  
There are also many whitespace changes in this PR, probably due to the IDE 
settings, which trim lines in all files that were opened...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r40333103
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -588,15 +653,16 @@ protected int cancel(String[] args) {
ActorGateway jobManager = getJobManagerGateway(options);
Future response = jobManager.ask(new 
CancelJob(jobId), askTimeout);
 
-   try {
-   Await.result(response, askTimeout);
-   return 0;
-   }
-   catch (Exception e) {
-   throw new Exception("Canceling the job with ID 
" + jobId + " failed.", e);
+   Object rc = Await.result(response, askTimeout);
+
+   if (rc instanceof CancellationFailure) {
--- End diff --

See above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142967362
  
I've just looked into the implementation of `Task.cancelTask` and there we 
also spawn a new thread to cancel the task. Thus, the semantics is the same for 
the `stopExecution` call. Since it is consistent, I'm fine with this 
implementation.

Maybe we could call the `stopExecution` method `stopTask` so that it's 
consistent with `cancelTask`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r40331106
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -263,7 +268,7 @@ protected int run(String[] args) {
catch (CliArgsException e) {
return handleArgException(e);
}
-   catch (Throwable t) {
+   catch (Exception t) {
--- End diff --

Why changing this? I think it was good before...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r40333050
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -534,7 +536,70 @@ public int compare(JobStatusMessage o1, 
JobStatusMessage o2) {
"RunningJobs. Instead the 
response is of type " + result.getClass() + ".");
}
}
-   catch (Throwable t) {
+   catch (Exception t) {
+   return handleError(t);
+   }
+   }
+
+   /**
+* Executes the STOP action.
+* 
+* @param args Command line arguments for the stop action.
+*/
+   protected int stop(String[] args) {
+   LOG.info("Running 'stop' command.");
+
+   StopOptions options;
+   try {
+   options = CliFrontendParser.parseStopCommand(args);
+   }
+   catch (CliArgsException e) {
+   return handleArgException(e);
+   }
+   catch (Exception t) {
+   return handleError(t);
+   }
+
+   // evaluate help flag
+   if (options.isPrintHelp()) {
+   CliFrontendParser.printHelpForStop();
+   return 0;
+   }
+
+   String[] stopArgs = options.getArgs();
+   JobID jobId;
+
+   if (stopArgs.length > 0) {
+   String jobIdString = stopArgs[0];
+   try {
+   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
+   }
+   catch (Exception e) {
+   LOG.error("Error: The value for the Job ID is 
not a valid ID.");
+   System.out.println("Error: The value for the 
Job ID is not a valid ID.");
+   return 1;
+   }
+   }
+   else {
+   LOG.error("Missing JobID in the command line 
arguments.");
+   System.out.println("Error: Specify a Job ID to stop a 
job.");
+   return 1;
+   }
+
+   try {
+   ActorGateway jobManager = getJobManagerGateway(options);
+   Future response = jobManager.ask(new 
StopJob(jobId), askTimeout);
+
+   Object rc = Await.result(response, askTimeout);
+
+   if (rc instanceof StoppingFailure) {
--- End diff --

I think that we decided not to use the built-in `Failure` object when 
transporting Exceptions between processes, if the Exceptions may be of 
user-defined classes (class loading issues). Using custom message types that 
explicitly require a `SerializedThrowable` makes it safer for users to 
recognize the need to supply a classloader.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142973877
  
I am curious here, is the `stop()` method implemented differently in any 
function differently than the `cancel()` method?

What would speak against implementing the entire stopping behavior super 
lightweight by simply canceling the sources without transitioning them to 
`CANCELED` upon success, but to `FINISHED`. Would that not give us the same 
thing?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-24 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142999802
  
Thanks for the review! Just to clarify: I did not apply any changes "just 
for fun". But I agree that this PR contains more changes as are actually 
related to this PR. For example, I wrote some tests according to the given 
"test pattern" of a package and was told that this test pattern should be 
changed. This also covers catching exceptions... So if this changes are "wrong" 
I was instructed not correctly. In order to "clean up" I changed the test 
pattern not just for my own new tests, but for the whole package (this might 
have been over eagerly...) Anyway, I agree, that this PR should be split and 
the changes to the tests should go into an own PR -- we can discuss what 
changes for the tests makes sense or not there.

How STOP works:
 - `stop` signal is only sent to streaming source tasks
 - thus, batch jobs and streaming jobs are distinguished now (for batch 
jobs, no stop signal can be triggered at all)
 - only streaming sources implement `Stoppable` interface and receive stop 
signal
 - there is no explicit state change necessary -- after streaming tasks 
return from "run", the sources automatically go to "finishing" and close there 
output channels which propagates through the whole ExecutionGraph and all tasks 
finish automatically. Thus the overall job is marked as SUCCESS and not FAILED
 - for the JobManager UI, sources set a flag that they received the stop 
signal (which is displayed with a checkmark). This might be helpful, if it take 
some time before a source actually returns from `run()` and switches from 
RUNNING into FINISHING (otherwise, the user has no feedback if STOP was issued 
or not)

About `stop()` vs `cancel()`: we need a dedicated stop signal because the 
stop signal only goes to streaming sources (and no other tasks) and does no 
trigger an explicit state change. For the user function, I used `cancel()` in 
the beginning, and was told the you agreed an the introduction of `stop()` 
method. So I just introduced it at the very last. I think there are some 
differences (see the JavaDoc I added for stop and cancel) so `UDF.stop()` might 
be useful (I think on exactly-once guartentee). If a job is canceled, it is a 
hard stop and exactly-once must not be preserved (tuples emitted by source are 
not processed as all tasks of the job get canceled at the same time). Not so 
for stop which gives exactly-once; the job is shut-down cleanly from the 
sources to the sinks and all data emitted by sources gets processed.

About whitespace formatting: there might be a few, but a lot of changes 
look like WS formatting but are not. This happens, if a whole block is removed 
(I did remove some try-catch-blocks) and if there are empty line (with to WS 
indention) within the block. The diff does not display the changes nicely and 
consecutive line look like WS formatting even if there are not (there was a tap 
remove due to the removed block). I enable auto-formatting in eclipse only for 
lines I changed...

I will split and update this PR and we can than take if from there. Makes 
sense?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-22 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142345277
  
Thanks. Makes sense. I wait for the feedback of Till and Stephan and fix 
this afterwards. This PR need a rebase anyway, that I can do all of this stuff 
when merging (if Till and Stephan give their OK).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-22 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142357249
  
Will have a look at this tomorrow. This touches some critical parts, so it 
needs a bit of time and calm to review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-22 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-142331659
  
I think the implementation of `stopExecution()` is fine. You are executing 
the call in a separate thread and you are logging exceptions. A blocking stop() 
method implemented by a user would not block the actor.
I would catch `Throwable` instead of `RuntimeException`, just to ensure 
that really no exception is thrown out of the thread.

This PR is touching very critical components of Flink. I would really 
appreciate a review by @tillrohrmann and @StephanEwen.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-21 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-141926593
  
Any news here? What is the opinion on Till's concern about using a `Future`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-15 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140301180
  
Hi, I cleand-up all old commits, and put a new commit on top introducing 
`SourceFunction.stop()` and unblock stop signal using an own thread. Please 
give feedback.

Btw: Travis fails due to unstable test. My own Travis is green: 
https://travis-ci.org/mjsax/flink/builds/80344479


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-15 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140335366
  
IMO, it would be better to wrap the `stopExecution` call in the 
`TaskManager` into a `Future`. This has the following reasons: First of all, 
with the current implementation, you'll miss all exceptions which occur in the 
`stop` method. Secondly, you will send a positive `TaskOperationResult` back 
before the stopping was executed. I haven't checked the semantic of the 
`TaskOperationResult` but it might be the case that the JM upon receiving this 
messages thinks that the stop call was successfully executed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-15 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-140375020
  
I thinks it should be fine. The `TaskOperationResult` should only indicate, 
that the signal was delivered successful (ie, only sent to "streaming 
sources"). All other operators would raise an exception. If the user did not 
implements `stop()` just nothing should happen... (at least from my point of 
view -- "stop" is only a request and the Source must no obey it -- in contrast 
to cancel). If this does not match the opinion of the majority, we can of 
course change it. (We would not need an additional task thread if we use a 
future, right?)

In order to see possible exceptions, I just added an additional try-catch 
around `SourceFunction.stop()` to log them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-10 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-139194317
  
> The cancel only sets an internal variable.

That is probably true for all sources that Flink provides.
But the interface for sources is a public API, and trusting that users are 
implementing this properly is dangerous. It can basically lock the entire Flink 
cluster.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-08 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-138569872
  
The `cancel` in the SourceFunction interface? As I said, I think it should 
be non-blocking. The cancel only sets an internal variable. The source must 
then respond by exiting from it's main loop and doing any cleanup in close or 
after the main-loop in `invoke`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-08 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-138587744
  
Yes. That is what I assumed writing the code for this PR. The question is, 
if you still simply assume that `cancel` behaves this way or try to enforce it 
somehow. Furthermore, the assumption about non-blocking behavior is not well 
documented. Should we at least update the JavaDoc for `cancel`?

This question is blocking this PR to get merged. So we need to get to a 
conclusion if there is an issue or not (and if I need to change my code or this 
PR can just get merged).

I personally would only update the JavaDoc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-08 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-138651481
  
I would be in favor of documenting it in the JavaDoc. What do you think 
@rmetzger as the one who probably knows the Kafka Source best.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38856545
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

If you put "-m yarn-cluster", it will deploy a YARN cluster from the 
client. The "-m yarn cluster" option should only be available to the "run" 
command.
For stop, users should usually not specify the JobManager address, because
- in standalone mode, the JM address is in the conf
- in YARN Mode, the JM address is in a special `.yarn-properties` file.
Only if that is not working, users can use the "-m ".


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-07 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38859306
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

I understand. Makes sense. I just c from other commands (and was wounding 
about it anyway ;) -- maybe I should have asked right away). The option `-m` in 
also specified for `cancel`, `list`. I guess it requires a complete clean up. 
Should I just include this in this PR or should be open a JIRA (and I just 
leave this inconsistency as is here)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-07 Thread mjsax
Github user mjsax commented on the pull request:

https://github.com/apache/flink/pull/750#issuecomment-138278500
  
Any input on the blocking/non-blocking question about `cancel` ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-07 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38863884
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

:+1:


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-07 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38863585
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

I'll write it down on my TODO list and fix it with the next YARN-related 
pull request (I'm planning to clean up the whole code a bit anyways)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-03 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38635830
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

So what should the next step be?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38508028
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

About Kafka, I think the approach would be to have `cancel()` signal to the 
source that it should close itself, then have the actual shutdown/cleanup in 
`close()` of RichFunction. What do you think about this @rmetzger ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38507948
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I agree with @mjsax that the cancel should be non-blocking. The method 
seems to be badly named, it should maybe have been named `stop()` from the 
beginning. Then it would be more obvious that is is just used for shutting down 
the source, no matter what the reason might be.

Unfortunately `close()` is also taken as part of `RichFunction`.

I think we have two options, keep `cancel` and use it for stopping, no 
matter what the reason is. Or rename it to `stop()` and use it in the same way 
to make it clearer what it does.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38511039
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I have to admit that this is the first time I'm looking into this pull 
request ;)
Regarding the whole blocking / non-blocking discussion: I think even if we 
would write into the javadocs that the cancel()/stop() call has to be 
implemented in a non-blocking fashion, there would be still users who do it 
wrong! Its just too risky to block the entire actor system by erroneous user 
code. (Aren't our windowing threads doing some magic in the close methods as 
well?!)
I'm think that the canceling of Tasks in the task manager is also done 
using separate cancelling threads?

For the Kafka source: I believe we can move the "fetcher.close()" and 
offsetHandler.close() into the close() method as well. (We would probably need 
to add cancel() method to the Fetcher interface).
But I would not touch the Kafka consumer and make the stop() mechanism more 
robust.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38511106
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

what happens when a user enters "stop -m yarn-cluster" ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38515501
  
--- Diff: docs/apis/cli.md ---
@@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs.
  -s,--scheduledShow only scheduled programs and their 
JobIDs
 
 
+Action "stop" stops a running program (streaming jobs only).
+
+  Syntax: stop [OPTIONS] 
+  "stop" action options:
+ -m,--jobmanagerAddress of the JobManager (master) to 
which
+   to connect. Specify 'yarn-cluster' as 
the
+   JobManager to deploy a YARN cluster for 
the
+   job. Use this flag to connect to a 
different
--- End diff --

I never tried it in a yarn cluster. Do you think it needs special handling? 
Basically, the "stop" signal is sent to the JobManager which forward the signal 
to all sources. This should be independent if yarn is involved or not. But I 
don't have any experience with yarn. Tell me if I am wrong.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-02 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38516042
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

Would it be possible, to change the interface to an abstract class that 
provides a protected member `isRunning` (or similar) and implements `public 
FINAL void stop()` method by setting `isRunning = false`. Thus, we would 
ensure, that the call is non-blocking. The user needs to check `isRunning` flag 
in `run()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...

2015-09-01 Thread mjsax
Github user mjsax commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38458556
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(s"Cannot find task to fail for execution 
${executionID})")
   }
 
+// stops a task
+case StopTask(executionID) =>
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

@aljoscha do you have any input on this? I am not sure what the next step 
should be here? @tillrohrmann do you have any other comments on the whole PR? I 
would like to finish this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...

2015-08-30 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38270665
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(sCannot find task to fail for execution 
${executionID}))
   }
 
+// stops a task
+case StopTask(executionID) =
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

I assume that the stops should be idempotent. But I agree if we document 
and check that all `cancel` calls are non-blocking, then it should work as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...

2015-08-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38179569
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java
 ---
@@ -25,15 +25,21 @@
 import backtype.storm.generated.StormTopology;
 import backtype.storm.generated.SubmitOptions;
 import backtype.storm.generated.TopologyInfo;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 
+import java.util.HashMap;
 import java.util.Map;
 
 /**
  * {@link FlinkTestCluster} mimics a Storm {@link LocalCluster} for 
ITCases via a {@link TestStreamEnvironment}.
  */
 public class FlinkTestCluster extends FlinkLocalCluster {
+   // TODO remove and look up job
--- End diff --

If you create a JIRA issue for this, then I think that you can remove the 
TODO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...

2015-08-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38179508
  
--- Diff: 
flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java
 ---
@@ -67,8 +67,10 @@ public static void main(final String[] args) throws 
Exception {
 
Utils.sleep(10 * 1000);
 
-   // TODO kill does no do anything so far
cluster.killTopology(topologyId);
+   // killing sends STOP signal, takes some time to clean up
+   Utils.sleep(1000);
--- End diff --

Hmm what is the advantage to conserve this behaviour compared to not 
conserving it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...

2015-08-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38186444
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java ---
@@ -107,28 +89,28 @@ public InfoTestCliFrontend(int expectedDop) throws 
Exception {
@Override
protected Client getClient(CommandLineOptions options, 
ClassLoader loader, String programName, int par)
throws Exception
-   {
+   {
return new TestClient(expectedDop);
-   }
+   }
}
-   
+
private static final class TestClient extends Client {
-   
+
private final int expectedDop;
-   
+
private TestClient(int expectedDop) throws Exception {
super(new InetSocketAddress(InetAddress.getLocalHost(), 
6176),
new Configuration(), 
CliFrontendInfoTest.class.getClassLoader(), -1);
-   
+
this.expectedDop = expectedDop;
}
-   
+
@Override
public String getOptimizedPlanAsJson(PackagedProgram prog, int 
parallelism)
throws CompilerException, 
ProgramInvocationException
-   {
+   {
--- End diff --

Same here with the indentation. But might also be a Github rendering 
problem.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...

2015-08-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38179870
  
--- Diff: 
flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 ---
@@ -411,6 +411,23 @@ class TaskManager(
 log.debug(sCannot find task to fail for execution 
${executionID}))
   }
 
+// stops a task
+case StopTask(executionID) =
+  val task = runningTasks.get(executionID)
+  if (task != null) {
+try {
+  task.stopExecution()
--- End diff --

Hmm, most sources ... does not look like a hard contract to me. I agree 
with you that we should state this more clearly in the description.

What we could also do is to execute the `stop` function in a `Future`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...

2015-08-28 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/750#discussion_r38186259
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -588,15 +653,16 @@ protected int cancel(String[] args) {
ActorGateway jobManager = getJobManagerGateway(options);
FutureObject response = jobManager.ask(new 
CancelJob(jobId), askTimeout);
 
-   try {
-   Await.result(response, askTimeout);
-   return 0;
-   }
-   catch (Exception e) {
-   throw new Exception(Canceling the job with ID 
 + jobId +  failed., e);
+   Object rc = Await.result(response, askTimeout);
+
+   if (rc instanceof CancellationFailure) {
--- End diff --

Maybe we can do the same with the `CancellationFailure` as with the 
`StopFailure`, because I haven't found a place where we use the `jobID` field 
of the `CancellationFailure` instance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


<    1   2   3   >