[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46278355 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.BaseTestingActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import scala.concurrent.ExecutionContext; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ExecutionVertex.class) +public class ExecutionVertexStopTest { + + @Test + public void testStop() throws Exception { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid); + + Execution executionMock = mock(Execution.class); + whenNew(Execution.class).withAnyArguments().thenReturn(executionMock); + + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + + vertex.stop(); + + verify(executionMock).stop(); + } + + private static ActorSystem system; + + @AfterClass + public static void teardown(){ + if(system != null) { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + } + + static boolean receivedStopSignal; --- End diff -- Usually it's common practice in Flink's code base to put the static fields at the very beginning of the class definition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46283023 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.java --- @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.BaseTestingActorGateway; +import org.apache.flink.runtime.instance.Instance; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.messages.Messages; +import org.apache.flink.runtime.messages.TaskMessages; +import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.junit.AfterClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import akka.actor.ActorSystem; +import akka.testkit.JavaTestKit; +import scala.concurrent.ExecutionContext; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.setVertexState; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getExecutionVertex; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.getInstance; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.whenNew; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(ExecutionVertex.class) +public class ExecutionVertexStopTest { + + @Test + public void testStop() throws Exception { + final JobVertexID jid = new JobVertexID(); + final ExecutionJobVertex ejv = getExecutionVertex(jid); + + Execution executionMock = mock(Execution.class); + whenNew(Execution.class).withAnyArguments().thenReturn(executionMock); + + final ExecutionVertex vertex = new ExecutionVertex(ejv, 0, new IntermediateResult[0], + AkkaUtils.getDefaultTimeout()); + + vertex.stop(); + + verify(executionMock).stop(); + } + + private static ActorSystem system; + + @AfterClass + public static void teardown(){ + if(system != null) { + JavaTestKit.shutdownActorSystem(system); + system = null; + } + } + + static boolean receivedStopSignal; --- End diff -- Ok. Will do. (for a test I thought is would be better to have it at the singlet test method where it is used). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46282619 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.taskmanager; + +import java.lang.reflect.Field; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.Stoppable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import scala.concurrent.duration.FiniteDuration; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, FiniteDuration.class }) +public class TaskStopTest { + private AbstractInvokable taskMock; + private Task task; + + public void doMocking() throws Exception { + TaskDeploymentDescriptor tddMock = mock(TaskDeploymentDescriptor.class); + when(tddMock.getNumberOfSubtasks()).thenReturn(1); + when(tddMock.getJobID()).thenReturn(mock(JobID.class)); + when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class)); + when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class)); + when(tddMock.getTaskName()).thenReturn("taskName"); + when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getInvokableClassName()).thenReturn("className"); + + task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), + mock(BroadcastVariableManager.class), mock(ActorGateway.class), mock(ActorGateway.class), + mock(FiniteDuration.class), mock(LibraryCacheManager.class), mock(FileCache.class), + mock(TaskManagerRuntimeInfo.class)); + Field f = task.getClass().getDeclaredField("invokable"); + f.setAccessible(true); + f.set(task, taskMock); --- End diff -- Why do you use class fields to pass in parameters like `taskMock`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46299763 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.taskmanager; + +import java.lang.reflect.Field; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.Stoppable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import scala.concurrent.duration.FiniteDuration; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, FiniteDuration.class }) +public class TaskStopTest { + private AbstractInvokable taskMock; + private Task task; + + public void doMocking() throws Exception { + TaskDeploymentDescriptor tddMock = mock(TaskDeploymentDescriptor.class); + when(tddMock.getNumberOfSubtasks()).thenReturn(1); + when(tddMock.getJobID()).thenReturn(mock(JobID.class)); + when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class)); + when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class)); + when(tddMock.getTaskName()).thenReturn("taskName"); + when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getInvokableClassName()).thenReturn("className"); + + task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), + mock(BroadcastVariableManager.class), mock(ActorGateway.class), mock(ActorGateway.class), + mock(FiniteDuration.class), mock(LibraryCacheManager.class), mock(FileCache.class), + mock(TaskManagerRuntimeInfo.class)); + Field f = task.getClass().getDeclaredField("invokable"); + f.setAccessible(true); + f.set(task, taskMock); --- End diff -- actually I was referring to `taskMock`. Why do you keep it as a class field and don't give it as a parameter to the `doMocking` method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46300103 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java --- @@ -227,4 +235,121 @@ public void testRequestPartitionState() throws Exception { } }}; } + + @Test + public void testStopSignal() throws Exception { + new JavaTestKit(system) {{ + // Setup + TestingCluster cluster = null; + + try { + cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT()); + + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(2); + sender.setInvokableClass(StoppableInvokable.class); + + final JobGraph jobGraph = new JobGraph("Blocking test job", JobType.STREAMING, sender); + final JobID jid = jobGraph.getJobID(); + + final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); + + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); + + jobManagerGateway.tell(new StopJob(jid), testActorGateway); + + // - The test -- + expectMsgClass(StoppingSuccess.class); --- End diff -- Of course we would have to avoid the race condition by sending a `NotifyWhenJobRemoved` message to the `JobManager` and awaiting its response. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46300384 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.taskmanager; + +import java.lang.reflect.Field; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.filecache.FileCache; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.Stoppable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import scala.concurrent.duration.FiniteDuration; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(PowerMockRunner.class) +@PrepareForTest({ TaskDeploymentDescriptor.class, JobID.class, FiniteDuration.class }) +public class TaskStopTest { + private AbstractInvokable taskMock; + private Task task; + + public void doMocking() throws Exception { + TaskDeploymentDescriptor tddMock = mock(TaskDeploymentDescriptor.class); + when(tddMock.getNumberOfSubtasks()).thenReturn(1); + when(tddMock.getJobID()).thenReturn(mock(JobID.class)); + when(tddMock.getVertexID()).thenReturn(mock(JobVertexID.class)); + when(tddMock.getExecutionId()).thenReturn(mock(ExecutionAttemptID.class)); + when(tddMock.getTaskName()).thenReturn("taskName"); + when(tddMock.getJobConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class)); + when(tddMock.getInvokableClassName()).thenReturn("className"); + + task = new Task(tddMock, mock(MemoryManager.class), mock(IOManager.class), mock(NetworkEnvironment.class), + mock(BroadcastVariableManager.class), mock(ActorGateway.class), mock(ActorGateway.class), + mock(FiniteDuration.class), mock(LibraryCacheManager.class), mock(FileCache.class), + mock(TaskManagerRuntimeInfo.class)); + Field f = task.getClass().getDeclaredField("invokable"); + f.setAccessible(true); + f.set(task, taskMock); --- End diff -- Now I understand... No specific reason. Just my code style. Will change it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46300626 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java --- @@ -227,4 +235,121 @@ public void testRequestPartitionState() throws Exception { } }}; } + + @Test + public void testStopSignal() throws Exception { + new JavaTestKit(system) {{ + // Setup + TestingCluster cluster = null; + + try { + cluster = startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT()); + + // Create a task + final JobVertex sender = new JobVertex("Sender"); + sender.setParallelism(2); + sender.setInvokableClass(StoppableInvokable.class); + + final JobGraph jobGraph = new JobGraph("Blocking test job", JobType.STREAMING, sender); + final JobID jid = jobGraph.getJobID(); + + final ActorGateway jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION()); + + // we can set the leader session ID to None because we don't use this gateway to send messages + final ActorGateway testActorGateway = new AkkaActorGateway(getTestActor(), null); + + // Submit the job and wait for all vertices to be running + jobManagerGateway.tell( + new SubmitJob( + jobGraph, + ListeningBehaviour.EXECUTION_RESULT), + testActorGateway); + expectMsgClass(JobManagerMessages.JobSubmitSuccess.class); + + jobManagerGateway.tell(new StopJob(jid), testActorGateway); + + // - The test -- + expectMsgClass(StoppingSuccess.class); --- End diff -- I see. Can do that (I am not aware of all messages that are available ;)) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46205018 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, JobStatusMessage o2) { } /** +* Executes the STOP action. +* +* @param args Command line arguments for the stop action. +*/ + protected int stop(String[] args) { + LOG.info("Running 'stop' command."); + + StopOptions options; + try { + options = CliFrontendParser.parseStopCommand(args); + } + catch (CliArgsException e) { + return handleArgException(e); + } + catch (Throwable t) { + return handleError(t); + } + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForStop(); + return 0; + } + + String[] stopArgs = options.getArgs(); + JobID jobId; + + if (stopArgs.length > 0) { + String jobIdString = stopArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } + catch (Exception e) { + LOG.error("Error: The value for the Job ID is not a valid ID."); + System.out.println("Error: The value for the Job ID is not a valid ID."); + return 1; + } + } + else { + LOG.error("Missing JobID in the command line arguments."); --- End diff -- See `cancel` -- same as above. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46204977 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, JobStatusMessage o2) { } /** +* Executes the STOP action. +* +* @param args Command line arguments for the stop action. +*/ + protected int stop(String[] args) { + LOG.info("Running 'stop' command."); + + StopOptions options; + try { + options = CliFrontendParser.parseStopCommand(args); + } + catch (CliArgsException e) { + return handleArgException(e); + } + catch (Throwable t) { + return handleError(t); + } + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForStop(); + return 0; + } + + String[] stopArgs = options.getArgs(); + JobID jobId; + + if (stopArgs.length > 0) { + String jobIdString = stopArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } + catch (Exception e) { + LOG.error("Error: The value for the Job ID is not a valid ID."); --- End diff -- We could. But `cancel` does not use it either. I guess we should have consistent patterns... I would just keep it as is. If you want to get it changed, we should changed it in `cancel`, too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-160779157 Same failing test as before. We forgot to address this: "I just investigated the failing test. The test is new and passed up to know. @uce changed ExecutionGraph.cancel() recently, allowing to cancel a job when it is in state "FAILING". I am not sure if this is a good solution -- I would remove the state transition from FAILING to CANCELLING. What do you think?" --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46205386 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import java.util.UUID; + +import akka.actor.*; +import akka.testkit.JavaTestKit; + +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.Option; +import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.junit.Assert.*; + +public class CliFrontendStopTest { --- End diff -- You might remember that I reworked a coupled of tests in this PR that I "removed" to keep this PR clean. I still have this in a local branch -- and it contains this change -- together with the same change to the other test classes in this package. I would keep it as is here, and apply the changes in a follow up commit. ok? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-160774595 Updated this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46205848 --- Diff: flink-runtime-web/web-dashboard/package.json --- @@ -7,27 +7,27 @@ "devDependencies": { "browserify": "^9.0.3", "coffeeify": "^1.0.0", -"gulp": "^3.8.11", +"gulp": "^3.9.0", "gulp-browserify": "^0.5.1", "gulp-coffee": "^2.3.1", "gulp-coffeeify": "^0.1.8", -"gulp-concat": "^2.5.2", -"gulp-filter": "^2.0.2", -"gulp-jade": "^1.0.0", -"gulp-less": "^3.0.2", -"gulp-livereload": "^3.8.0", -"gulp-minify-css": "^1.0.0", -"gulp-ng-annotate": "^0.5.2", -"gulp-plumber": "^1.0.0", +"gulp-concat": "^2.6.0", +"gulp-filter": "^3.0.1", +"gulp-jade": "^1.1.0", +"gulp-less": "^3.0.5", +"gulp-livereload": "^3.8.1", +"gulp-minify-css": "^1.2.1", +"gulp-ng-annotate": "^1.1.0", +"gulp-plumber": "^1.0.1", "gulp-rename": "^1.2.0", -"gulp-serve": "^0.3.1", -"gulp-sourcemaps": "^1.5.1", -"gulp-stylus": "^2.0.1", -"gulp-uglify": "^1.1.0", -"gulp-util": "^3.0.4", +"gulp-serve": "^1.2.0", +"gulp-sourcemaps": "^1.6.0", +"gulp-stylus": "^2.1.0", +"gulp-uglify": "^1.5.1", +"gulp-util": "^3.0.7", "jade": "^1.9.2", "jadeify": "^4.1.0", -"main-bower-files": "^2.6.2", +"main-bower-files": "^2.9.0", --- End diff -- I guess this are not dependencies but a list of used tools... As everybody has a slightly different setup, this contains a lot of changes -- but I guess it not relevant. @sachingoel0101 can you comment on this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46205569 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import java.util.UUID; + +import akka.actor.*; +import akka.testkit.JavaTestKit; + +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.Option; +import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.junit.Assert.*; + +public class CliFrontendStopTest { + + private static ActorSystem actorSystem; + + @BeforeClass + public static void setup() { + pipeSystemOutToNull(); + actorSystem = ActorSystem.create("TestingActorSystem"); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(actorSystem); + actorSystem = null; + } + + @BeforeClass + public static void init() { --- End diff -- Inspired by `CliFrontendListCancelTest` -- it does the same. Not sure to be honest. Does it make a difference? If I change it, I would make sense to change in both... Let me know what you think about it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46206488 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -402,21 +405,53 @@ public void onComplete(Throwable failure, Object success) throws Throwable { } } + public void stop() { + // sends stop RPC call + + final SimpleSlot slot = this.assignedResource; + + if (slot != null) { + final ActorGateway gateway = slot.getInstance().getActorGateway(); + + Future stopResult = gateway.retry( + new StopTask(attemptId), + NUM_STOP_CALL_TRIES, + timeout, + executionContext); + + stopResult.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object success) throws Throwable { + if (failure != null) { + fail(new Exception("Task could not be stopped.", failure)); + } else { + TaskOperationResult result = (TaskOperationResult) success; + if (!result.success()) { + LOG.debug("Stopping task call did not find task. Probably akka message call race."); --- End diff -- Same pattern as `cancel` `onComplete` back-call (see line 901) -- I we change it, we should change both. I would keep it. If you want to get it changed for both, let me know. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46171068 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import java.util.UUID; + +import akka.actor.*; +import akka.testkit.JavaTestKit; + +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.Option; +import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.junit.Assert.*; + +public class CliFrontendStopTest { + + private static ActorSystem actorSystem; + + @BeforeClass + public static void setup() { + pipeSystemOutToNull(); + actorSystem = ActorSystem.create("TestingActorSystem"); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(actorSystem); + actorSystem = null; + } + + @BeforeClass + public static void init() { + CliFrontendTestUtils.pipeSystemOutToNull(); + CliFrontendTestUtils.clearGlobalConfiguration(); + } + + @Test + public void testStop() { + try { + // test unrecognized option + { + String[] parameters = { "-v", "-l" }; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + int retCode = testFrontend.stop(parameters); + assertTrue(retCode != 0); + } + + // test missing job id + { + String[] parameters = {}; + CliFrontend testFrontend = new CliFrontend(CliFrontendTestUtils.getConfigDir()); + int retCode = testFrontend.stop(parameters); + assertTrue(retCode != 0); + } + + // test stop properly + { + JobID jid = new JobID(); + String jidString = jid.toString(); + + final Option leaderSessionID = Option.apply(UUID.randomUUID()); + final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid)); + + final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + + String[] parameters = { jidString }; + StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway); + + int retCode = testFrontend.stop(parameters); + assertTrue(retCode == 0); + } + + // test stop properly + { + JobID jid1 = new JobID(); + JobID jid2 = new JobID(); + + final Option leaderSessionID = Option.apply(UUID.randomUUID()); + final ActorRef jm = actorSystem.actorOf(Props.create(CliJobManager.class, jid1)); + + final ActorGateway gateway = new AkkaActorGateway(jm, leaderSessionID); + + String[] parameters = { jid2.toString() }; + StopTestCliFrontend testFrontend = new StopTestCliFrontend(gateway);
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46171614 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -188,9 +189,12 @@ public WebRuntimeMonitor( // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) - // DELETE is the preferred way of cancelling a job (Rest-conform) + // DELETE is the preferred way of canceling a job (Rest-conform) .DELETE("/jobs/:jobid", handler(new JobCancellationHandler())) + // stop a job + .DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler())) + --- End diff -- What's the state of resolving this issue? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46172044 --- Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee --- @@ -216,4 +216,7 @@ angular.module('flinkApp') # proper "DELETE jobs//" $http.get "jobs/" + jobid + "/yarn-cancel" + @stopJob = (jobid) -> +$http.delete "jobs/" + jobid + "/stop" --- End diff -- I think, it's quite important to support the stop call for Yarn setups as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46173267 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -1098,8 +1132,8 @@ public void registerExecutionListener(ActorGateway listener) { this.executionListenerActors.add(listener); } } - - + + --- End diff -- Two line linebreak. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46172301 --- Diff: flink-runtime-web/web-dashboard/package.json --- @@ -7,27 +7,27 @@ "devDependencies": { "browserify": "^9.0.3", "coffeeify": "^1.0.0", -"gulp": "^3.8.11", +"gulp": "^3.9.0", "gulp-browserify": "^0.5.1", "gulp-coffee": "^2.3.1", "gulp-coffeeify": "^0.1.8", -"gulp-concat": "^2.5.2", -"gulp-filter": "^2.0.2", -"gulp-jade": "^1.0.0", -"gulp-less": "^3.0.2", -"gulp-livereload": "^3.8.0", -"gulp-minify-css": "^1.0.0", -"gulp-ng-annotate": "^0.5.2", -"gulp-plumber": "^1.0.0", +"gulp-concat": "^2.6.0", +"gulp-filter": "^3.0.1", +"gulp-jade": "^1.1.0", +"gulp-less": "^3.0.5", +"gulp-livereload": "^3.8.1", +"gulp-minify-css": "^1.2.1", +"gulp-ng-annotate": "^1.1.0", +"gulp-plumber": "^1.0.1", "gulp-rename": "^1.2.0", -"gulp-serve": "^0.3.1", -"gulp-sourcemaps": "^1.5.1", -"gulp-stylus": "^2.0.1", -"gulp-uglify": "^1.1.0", -"gulp-util": "^3.0.4", +"gulp-serve": "^1.2.0", +"gulp-sourcemaps": "^1.6.0", +"gulp-stylus": "^2.1.0", +"gulp-uglify": "^1.5.1", +"gulp-util": "^3.0.7", "jade": "^1.9.2", "jadeify": "^4.1.0", -"main-bower-files": "^2.6.2", +"main-bower-files": "^2.9.0", --- End diff -- Just out of curiosity, why do we change the versions of so many web dependencies? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46173139 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java --- @@ -254,13 +262,14 @@ public ExecutionGraph( ExecutionContext executionContext, JobID jobId, String jobName, + JobType jobType, Configuration jobConfig, FiniteDuration timeout, List requiredJarFiles, List requiredClasspaths, ClassLoader userClassLoader) { - if (executionContext == null || jobId == null || jobName == null || jobConfig == null || userClassLoader == null) { + if (executionContext == null || jobId == null || jobName == null || jobType == null || jobConfig == null || userClassLoader == null) { throw new NullPointerException(); --- End diff -- I think it would be good to know which of the fields was actually `null`. We could use ``` executionContext = Preconditions.checkNotNull(executionContext); ... ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46174663 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala --- @@ -59,6 +59,15 @@ object TaskMessages { extends TaskMessage with RequiresLeaderSessionID /** + * Stops the task associated with [[attemptID]]. The result is sent back to the sender as a + * [[TaskOperationResult]] message. + * + * @param attemptID The task's execution attempt ID. + */ + case class StopTask(attemptID: ExecutionAttemptID) +extends TaskMessage --- End diff -- This class should also extend `RequiresLeaderSessionID`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46172807 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java --- @@ -402,21 +405,53 @@ public void onComplete(Throwable failure, Object success) throws Throwable { } } + public void stop() { + // sends stop RPC call + + final SimpleSlot slot = this.assignedResource; + + if (slot != null) { + final ActorGateway gateway = slot.getInstance().getActorGateway(); + + Future stopResult = gateway.retry( + new StopTask(attemptId), + NUM_STOP_CALL_TRIES, + timeout, + executionContext); + + stopResult.onComplete(new OnComplete() { + @Override + public void onComplete(Throwable failure, Object success) throws Throwable { + if (failure != null) { + fail(new Exception("Task could not be stopped.", failure)); + } else { + TaskOperationResult result = (TaskOperationResult) success; + if (!result.success()) { + LOG.debug("Stopping task call did not find task. Probably akka message call race."); --- End diff -- Also on `INFO` level. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46174475 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala --- @@ -95,6 +95,14 @@ object JobManagerMessages { case class CancelJob(jobID: JobID) extends RequiresLeaderSessionID /** + * Stops a (streaming) job with the given [[jobID]] at the JobManager. The result of + * stopping is sent back to the sender as a [[StoppingResponse]] message. + * + * @param jobID + */ + case class StopJob(jobID: JobID) --- End diff -- `StopJob` should extend `RequiresLeaderSessionID` in order to cope with changing leaders. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46159742 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- Is this still on your TODO list @rmetzger? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46161133 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, JobStatusMessage o2) { } /** +* Executes the STOP action. +* +* @param args Command line arguments for the stop action. +*/ + protected int stop(String[] args) { + LOG.info("Running 'stop' command."); + + StopOptions options; + try { + options = CliFrontendParser.parseStopCommand(args); + } + catch (CliArgsException e) { + return handleArgException(e); + } + catch (Throwable t) { + return handleError(t); + } + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForStop(); + return 0; + } + + String[] stopArgs = options.getArgs(); + JobID jobId; + + if (stopArgs.length > 0) { + String jobIdString = stopArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } + catch (Exception e) { + LOG.error("Error: The value for the Job ID is not a valid ID."); + System.out.println("Error: The value for the Job ID is not a valid ID."); + return 1; + } + } + else { + LOG.error("Missing JobID in the command line arguments."); --- End diff -- Maybe we can use `handleArgException` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46160278 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- Yes, but its not on the first page anymore ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46160961 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -569,6 +571,69 @@ public int compare(JobStatusMessage o1, JobStatusMessage o2) { } /** +* Executes the STOP action. +* +* @param args Command line arguments for the stop action. +*/ + protected int stop(String[] args) { + LOG.info("Running 'stop' command."); + + StopOptions options; + try { + options = CliFrontendParser.parseStopCommand(args); + } + catch (CliArgsException e) { + return handleArgException(e); + } + catch (Throwable t) { + return handleError(t); + } + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForStop(); + return 0; + } + + String[] stopArgs = options.getArgs(); + JobID jobId; + + if (stopArgs.length > 0) { + String jobIdString = stopArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } + catch (Exception e) { + LOG.error("Error: The value for the Job ID is not a valid ID."); --- End diff -- Maybe we can use the `handleError` method here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46162472 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import java.util.UUID; + +import akka.actor.*; +import akka.testkit.JavaTestKit; + +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.Option; +import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.junit.Assert.*; + +public class CliFrontendStopTest { + + private static ActorSystem actorSystem; + + @BeforeClass + public static void setup() { + pipeSystemOutToNull(); + actorSystem = ActorSystem.create("TestingActorSystem"); + } + + @AfterClass + public static void teardown() { + JavaTestKit.shutdownActorSystem(actorSystem); + actorSystem = null; + } + + @BeforeClass + public static void init() { --- End diff -- Why do you have 2 `@BeforeClass` methods here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46162322 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendStopTest.java --- @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client; + +import java.util.UUID; + +import akka.actor.*; +import akka.testkit.JavaTestKit; + +import org.apache.flink.client.cli.CommandLineOptions; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.instance.AkkaActorGateway; +import org.apache.flink.runtime.messages.JobManagerMessages; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import scala.Option; +import static org.apache.flink.client.CliFrontendTestUtils.pipeSystemOutToNull; +import static org.junit.Assert.*; + +public class CliFrontendStopTest { --- End diff -- `extends TestLogger`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r46160903 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- Alright, but for the moment it would ok to merge it like it is here, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-160100311 I'd volunteer as a shepherd of this PR. @mjsax I'll review your PR at the latest next week. Sorry for the long period of inactivity from my side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45640243 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade --- @@ -43,6 +43,10 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job") span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)") | Cancel + .navbar-info.last.first(ng-if!="job.type=='STREAMING' && (job.state=='RUNNING' || job.state=='CREATED')") --- End diff -- Not sure exactly any more... I just tried to reproduce my search... I guess it was one of this: http://jade-lang.com/reference/code/ https://stackoverflow.com/questions/6926247/nodejs-jade-escape-markup --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45640613 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade --- @@ -43,6 +43,10 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job") span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)") | Cancel + .navbar-info.last.first(ng-if!="job.type=='STREAMING' && (job.state=='RUNNING' || job.state=='CREATED')") --- End diff -- Ah. That helps. Thanks. :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45639014 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade --- @@ -43,6 +43,10 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job") span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)") | Cancel + .navbar-info.last.first(ng-if!="job.type=='STREAMING' && (job.state=='RUNNING' || job.state=='CREATED')") --- End diff -- Yes. Quite odd indeed. Can you point out the resource where you saw this? I'm a little curious. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45639008 --- Diff: flink-runtime-web/web-dashboard/web/partials/jobs/job.html --- @@ -32,7 +32,7 @@ - {{ job['end-time'] | amDateFormat:'-MM-DD, H:mm:ss' }} {{job.duration | humanizeDuration:true}} - Cancel + Cancel --- End diff -- You are right... That is an old version. I forgot to commit this file. This will be ``` Cancel Stop ``` Just adding the second line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-158912414 Just rebased. Can you please review @StephanEwen @tillrohrmann --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-158913013 @sachingoel0101 Is the UI change ok? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-158929824 I just investigated the failing test. The test is new and passed up to know. @uce changed `ExecutionGraph.cancel()` recently, allowing to cancel a job when it is in state "FAILING". I am not sure if this is a good solution -- I would remove the state transition from FAILING to CANCELLING. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45634054 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -188,9 +189,12 @@ public WebRuntimeMonitor( // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) - // DELETE is the preferred way of cancelling a job (Rest-conform) + // DELETE is the preferred way of canceling a job (Rest-conform) .DELETE("/jobs/:jobid", handler(new JobCancellationHandler())) + // stop a job + .DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler())) + --- End diff -- This is somewhat counter-intuitive. `DELETE` is meant to remove the resource at the specified URI. Specifying a action as `/stop` is not a good idea. Perhaps a better idea would be to identify these actions as `/jobs/:jobid?mode=cancel` and `/jobs/:jobid?mode=stop`. @StephanEwen might have a better idea about this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45635435 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -188,9 +189,12 @@ public WebRuntimeMonitor( // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) - // DELETE is the preferred way of cancelling a job (Rest-conform) + // DELETE is the preferred way of canceling a job (Rest-conform) .DELETE("/jobs/:jobid", handler(new JobCancellationHandler())) + // stop a job + .DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler())) + --- End diff -- Will be changed to `GET` anyway. Would it be still an issue? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45635941 --- Diff: flink-runtime-web/web-dashboard/web/partials/jobs/job.html --- @@ -32,7 +32,7 @@ - {{ job['end-time'] | amDateFormat:'-MM-DD, H:mm:ss' }} {{job.duration | humanizeDuration:true}} - Cancel + Cancel --- End diff -- This is weird. The stop button code isn't actually here. Are you sure this file is up-to-date? Can you run gulp again? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45636371 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -188,9 +189,12 @@ public WebRuntimeMonitor( // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) - // DELETE is the preferred way of cancelling a job (Rest-conform) + // DELETE is the preferred way of canceling a job (Rest-conform) .DELETE("/jobs/:jobid", handler(new JobCancellationHandler())) + // stop a job + .DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler())) + --- End diff -- `DELETE` is more conforming to the REST framework. `GET` is just a workaround to support yarn when working on the AM proxy page. Yarn doesn't transfer anything besides `GET` to the AM's http server. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45634227 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade --- @@ -43,6 +43,10 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job") span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)") | Cancel + .navbar-info.last.first(ng-if!="job.type=='STREAMING' && (job.state=='RUNNING' || job.state=='CREATED')") --- End diff -- Are you sure this works? :confused: The `!=` shouldn't be there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45634353 --- Diff: flink-runtime-web/web-dashboard/app/scripts/modules/jobs/jobs.svc.coffee --- @@ -216,4 +216,7 @@ angular.module('flinkApp') # proper "DELETE jobs//" $http.get "jobs/" + jobid + "/yarn-cancel" + @stopJob = (jobid) -> +$http.delete "jobs/" + jobid + "/stop" --- End diff -- As I commented before, this should rather be a `GET` request at `/yarn-stop` to support Yarn cancellations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45635078 --- Diff: flink-runtime-web/web-dashboard/app/partials/jobs/job.jade --- @@ -43,6 +43,10 @@ nav.navbar.navbar-default.navbar-fixed-top.navbar-main(ng-if="job") span.navbar-info-button.btn.btn-default(ng-click="cancelJob($event)") | Cancel + .navbar-info.last.first(ng-if!="job.type=='STREAMING' && (job.state=='RUNNING' || job.state=='CREATED')") --- End diff -- The syntax is quite odd, but it is correct (I figures this out on the Internet). It is necessary to protect `&&` from being changed to HTML `` during compilation. I also tested it: STOP button is only displayed for streaming jobs and if I click it the stop shuts down gracefully. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r45635645 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java --- @@ -188,9 +189,12 @@ public WebRuntimeMonitor( // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) .GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())) - // DELETE is the preferred way of cancelling a job (Rest-conform) + // DELETE is the preferred way of canceling a job (Rest-conform) .DELETE("/jobs/:jobid", handler(new JobCancellationHandler())) + // stop a job + .DELETE("/jobs/:jobid/stop", handler(new JobStoppingHandler())) + --- End diff -- Why is there actually `GET` and `DELETE` for `cancel`? Why not just `GET`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-158050729 Opened a new JIRA for the failing build. Seems to be a instable test. (Build passed on my personla travis: https://travis-ci.org/mjsax/flink/builds/91941028) Please review this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157817216 Hi @mjsax , your change to coffeescript aren't ported to the actual javascript. It appears you forgot to run `gulp`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157832022 Thanks. Great. It works now. I was not aware the I have to build the UI manually. Would it be possible to integrate it into maven build? At least we should provide a README how to build it (or better a script to run). What do you think about it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157833952 There is a readme: https://github.com/apache/flink/tree/master/flink-runtime-web --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157834943 How could I miss this...?? Sorry. My own fault... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157712835 @sachingoel0101 I have a problem testing my changes to the WebUI. It seems that the browser does not load the correct (ie, changed) scripts... Check out the screenshot -- it only shows the unmodified `@cancelJob = (jobid) -> $http.delete "jobs/" + jobid` and the inserted `stop` part is missing (I have almost no experience in web-dev stuff). I built the hole Flink project with `mvn -DskipTests clean install`; also delete maven repo to get rid of old stuff. I tried to reload (F5 and ctrl+F5 as well as clearing the cache; using a private window did not help either). Can you help me out? The only change I can see it the new "stop" button -- so `jobs.html` is loaded correctly. ![screenshot - 11182015 - 02 20 59 pm](https://cloud.githubusercontent.com/assets/8959638/11242126/1959dfc4-8e00-11e5-9840-127174ffabc3.png) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157901595 Stop in WebUI is now working. After rebase, I realized that cancel was changed to GET for yarn. Do we need to implement stop the same way for yarn @mxm ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157442077 I just rebased this. Due to the new JobManager WebUI, I added a second commit to add STOP button to WebUI (which this does not work yet). I will squash both commits later on. I just pushed this rebase, such that the actual changes which are independent from the WebUI can get reviewed already. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157521508 I am confused why the build failed... It worked locally **and** on my personal travis: https://travis-ci.org/mjsax/flink/builds/91638981 Furthermore, if I understand the error message correctly, it complains that `MyErroneousTimestampSource` does not implement method `stop()` -- however, the code shows clearly that the method got added... (line 587, 588). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-157529908 Travis is caching the contents of the .m2 directory. Maybe that caused some weird issues. I also had some travis issues which let tests fail with some very weird issues. If the issue persists with the next commit, we can try some stuff. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-149580271 Is there any news here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-143539717 Done. I split the PR and squashed the commit. @StephanEwen please let me know if it is ok. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r40334026 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -452,6 +453,73 @@ public void cancel(JobID jobId) throws Exception { } } + /** +* Stops a program on Flink cluster whose job-manager is configured in this client's configuration. +* Stopping works only for streaming programs. Be aware, that the program might continue to run for +* a while after sending the stop command, because after sources stopped to emit data all operators +* need to finish processing. +* +* @param jobId +*the job ID of the streaming program to stop +* @throws ProgramStopException +* If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal +* failed. That might be due to an I/O problem, ie, the job-manager is unreachable. +*/ + public void stop(JobID jobId) throws Exception { + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); + + ActorSystem actorSystem; + try { + actorSystem = JobClient.startJobClientActorSystem(configuration); + } catch (Exception e) { + throw new ProgramStopException("Could not start client actor system.", e); + } + + try { + ActorGateway jobManagerGateway; + + LeaderRetrievalService leaderRetrievalService; + + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + } catch (Exception e) { + throw new ProgramInvocationException("Could not create the leader retrieval service.", e); + } + + try { + jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway( + leaderRetrievalService, + actorSystem, + lookupTimeout); + } catch (LeaderRetrievalException e) { + throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e); + } + + Future response; + try { + response = jobManagerGateway.ask(new StopJob(jobId), timeout); + } catch (Exception e) { + throw new ProgramInvocationException("Failed to query the job manager gateway.", e); + } + + Object result = Await.result(response, timeout); + + if (result instanceof JobManagerMessages.StoppingSuccess) { + LOG.debug("Job stopping with ID " + jobId + " succeeded."); + } else if (result instanceof JobManagerMessages.StoppingFailure) { + Throwable t = ((JobManagerMessages.StoppingFailure) result).cause(); + LOG.debug("Job stopping with ID " + jobId + " failed.", t); --- End diff -- This should be logged on INFO level... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r40333992 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/Client.java --- @@ -452,6 +453,73 @@ public void cancel(JobID jobId) throws Exception { } } + /** +* Stops a program on Flink cluster whose job-manager is configured in this client's configuration. +* Stopping works only for streaming programs. Be aware, that the program might continue to run for +* a while after sending the stop command, because after sources stopped to emit data all operators +* need to finish processing. +* +* @param jobId +*the job ID of the streaming program to stop +* @throws ProgramStopException +* If the job ID is invalid (ie, is unknown or refers to a batch job) or if sending the stop signal +* failed. That might be due to an I/O problem, ie, the job-manager is unreachable. +*/ + public void stop(JobID jobId) throws Exception { + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + final FiniteDuration lookupTimeout = AkkaUtils.getLookupTimeout(configuration); + + ActorSystem actorSystem; + try { + actorSystem = JobClient.startJobClientActorSystem(configuration); + } catch (Exception e) { + throw new ProgramStopException("Could not start client actor system.", e); + } + + try { + ActorGateway jobManagerGateway; + + LeaderRetrievalService leaderRetrievalService; + + try { + leaderRetrievalService = LeaderRetrievalUtils.createLeaderRetrievalService(configuration); + } catch (Exception e) { + throw new ProgramInvocationException("Could not create the leader retrieval service.", e); + } + + try { + jobManagerGateway = LeaderRetrievalUtils.retrieveLeaderGateway( + leaderRetrievalService, + actorSystem, + lookupTimeout); + } catch (LeaderRetrievalException e) { + throw new ProgramInvocationException("Failed to retrieve JobManager gateway", e); + } + + Future response; + try { + response = jobManagerGateway.ask(new StopJob(jobId), timeout); + } catch (Exception e) { + throw new ProgramInvocationException("Failed to query the job manager gateway.", e); + } + + Object result = Await.result(response, timeout); + + if (result instanceof JobManagerMessages.StoppingSuccess) { + LOG.debug("Job stopping with ID " + jobId + " succeeded."); --- End diff -- This should be logged on INFO level... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142970422 There are some crucial issues still with this pull request: Most importantly, it changes parts that it needs not change, altering system behavior. That manifests itself in the 130+ changed files, for something that should probably touch around 20 files. As per the contribution guidelines, it is very important that pull requests do not do that. We have seen too many cases where bugs were introduced because code parts were changed for no other reason than someone thinking that the code should look different in their opinion. Reporting and changing such unrelated parts should always be a separate issue. In the first few files, I see examples of - Changing the type of exceptions caught. We often explicitly catch Throwables, because we want to handle Errors as well, such as LinkageErrors which are common when the dynamic class loading fails. - Changing of the way tests deal with exceptions. There are good reasons to explicitly catch and check expected exceptions (check messages, safe against maven mis-configurations where exceptions propagating out of tests were not correctly recognized, I saw that more than once so far). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142971028 What would really help to review this is a summary of how it actually works. We currently have to piece that together from the code, which takes quite some time... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142974060 There are also many whitespace changes in this PR, probably due to the IDE settings, which trim lines in all files that were opened... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r40333103 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -588,15 +653,16 @@ protected int cancel(String[] args) { ActorGateway jobManager = getJobManagerGateway(options); Future response = jobManager.ask(new CancelJob(jobId), askTimeout); - try { - Await.result(response, askTimeout); - return 0; - } - catch (Exception e) { - throw new Exception("Canceling the job with ID " + jobId + " failed.", e); + Object rc = Await.result(response, askTimeout); + + if (rc instanceof CancellationFailure) { --- End diff -- See above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142967362 I've just looked into the implementation of `Task.cancelTask` and there we also spawn a new thread to cancel the task. Thus, the semantics is the same for the `stopExecution` call. Since it is consistent, I'm fine with this implementation. Maybe we could call the `stopExecution` method `stopTask` so that it's consistent with `cancelTask`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r40331106 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -263,7 +268,7 @@ protected int run(String[] args) { catch (CliArgsException e) { return handleArgException(e); } - catch (Throwable t) { + catch (Exception t) { --- End diff -- Why changing this? I think it was good before... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r40333050 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -534,7 +536,70 @@ public int compare(JobStatusMessage o1, JobStatusMessage o2) { "RunningJobs. Instead the response is of type " + result.getClass() + "."); } } - catch (Throwable t) { + catch (Exception t) { + return handleError(t); + } + } + + /** +* Executes the STOP action. +* +* @param args Command line arguments for the stop action. +*/ + protected int stop(String[] args) { + LOG.info("Running 'stop' command."); + + StopOptions options; + try { + options = CliFrontendParser.parseStopCommand(args); + } + catch (CliArgsException e) { + return handleArgException(e); + } + catch (Exception t) { + return handleError(t); + } + + // evaluate help flag + if (options.isPrintHelp()) { + CliFrontendParser.printHelpForStop(); + return 0; + } + + String[] stopArgs = options.getArgs(); + JobID jobId; + + if (stopArgs.length > 0) { + String jobIdString = stopArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } + catch (Exception e) { + LOG.error("Error: The value for the Job ID is not a valid ID."); + System.out.println("Error: The value for the Job ID is not a valid ID."); + return 1; + } + } + else { + LOG.error("Missing JobID in the command line arguments."); + System.out.println("Error: Specify a Job ID to stop a job."); + return 1; + } + + try { + ActorGateway jobManager = getJobManagerGateway(options); + Future response = jobManager.ask(new StopJob(jobId), askTimeout); + + Object rc = Await.result(response, askTimeout); + + if (rc instanceof StoppingFailure) { --- End diff -- I think that we decided not to use the built-in `Failure` object when transporting Exceptions between processes, if the Exceptions may be of user-defined classes (class loading issues). Using custom message types that explicitly require a `SerializedThrowable` makes it safer for users to recognize the need to supply a classloader. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142973877 I am curious here, is the `stop()` method implemented differently in any function differently than the `cancel()` method? What would speak against implementing the entire stopping behavior super lightweight by simply canceling the sources without transitioning them to `CANCELED` upon success, but to `FINISHED`. Would that not give us the same thing? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142999802 Thanks for the review! Just to clarify: I did not apply any changes "just for fun". But I agree that this PR contains more changes as are actually related to this PR. For example, I wrote some tests according to the given "test pattern" of a package and was told that this test pattern should be changed. This also covers catching exceptions... So if this changes are "wrong" I was instructed not correctly. In order to "clean up" I changed the test pattern not just for my own new tests, but for the whole package (this might have been over eagerly...) Anyway, I agree, that this PR should be split and the changes to the tests should go into an own PR -- we can discuss what changes for the tests makes sense or not there. How STOP works: - `stop` signal is only sent to streaming source tasks - thus, batch jobs and streaming jobs are distinguished now (for batch jobs, no stop signal can be triggered at all) - only streaming sources implement `Stoppable` interface and receive stop signal - there is no explicit state change necessary -- after streaming tasks return from "run", the sources automatically go to "finishing" and close there output channels which propagates through the whole ExecutionGraph and all tasks finish automatically. Thus the overall job is marked as SUCCESS and not FAILED - for the JobManager UI, sources set a flag that they received the stop signal (which is displayed with a checkmark). This might be helpful, if it take some time before a source actually returns from `run()` and switches from RUNNING into FINISHING (otherwise, the user has no feedback if STOP was issued or not) About `stop()` vs `cancel()`: we need a dedicated stop signal because the stop signal only goes to streaming sources (and no other tasks) and does no trigger an explicit state change. For the user function, I used `cancel()` in the beginning, and was told the you agreed an the introduction of `stop()` method. So I just introduced it at the very last. I think there are some differences (see the JavaDoc I added for stop and cancel) so `UDF.stop()` might be useful (I think on exactly-once guartentee). If a job is canceled, it is a hard stop and exactly-once must not be preserved (tuples emitted by source are not processed as all tasks of the job get canceled at the same time). Not so for stop which gives exactly-once; the job is shut-down cleanly from the sources to the sinks and all data emitted by sources gets processed. About whitespace formatting: there might be a few, but a lot of changes look like WS formatting but are not. This happens, if a whole block is removed (I did remove some try-catch-blocks) and if there are empty line (with to WS indention) within the block. The diff does not display the changes nicely and consecutive line look like WS formatting even if there are not (there was a tap remove due to the removed block). I enable auto-formatting in eclipse only for lines I changed... I will split and update this PR and we can than take if from there. Makes sense? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142345277 Thanks. Makes sense. I wait for the feedback of Till and Stephan and fix this afterwards. This PR need a rebase anyway, that I can do all of this stuff when merging (if Till and Stephan give their OK). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142357249 Will have a look at this tomorrow. This touches some critical parts, so it needs a bit of time and calm to review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-142331659 I think the implementation of `stopExecution()` is fine. You are executing the call in a separate thread and you are logging exceptions. A blocking stop() method implemented by a user would not block the actor. I would catch `Throwable` instead of `RuntimeException`, just to ensure that really no exception is thrown out of the thread. This PR is touching very critical components of Flink. I would really appreciate a review by @tillrohrmann and @StephanEwen. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-141926593 Any news here? What is the opinion on Till's concern about using a `Future`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-140301180 Hi, I cleand-up all old commits, and put a new commit on top introducing `SourceFunction.stop()` and unblock stop signal using an own thread. Please give feedback. Btw: Travis fails due to unstable test. My own Travis is green: https://travis-ci.org/mjsax/flink/builds/80344479 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-140335366 IMO, it would be better to wrap the `stopExecution` call in the `TaskManager` into a `Future`. This has the following reasons: First of all, with the current implementation, you'll miss all exceptions which occur in the `stop` method. Secondly, you will send a positive `TaskOperationResult` back before the stopping was executed. I haven't checked the semantic of the `TaskOperationResult` but it might be the case that the JM upon receiving this messages thinks that the stop call was successfully executed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-140375020 I thinks it should be fine. The `TaskOperationResult` should only indicate, that the signal was delivered successful (ie, only sent to "streaming sources"). All other operators would raise an exception. If the user did not implements `stop()` just nothing should happen... (at least from my point of view -- "stop" is only a request and the Source must no obey it -- in contrast to cancel). If this does not match the opinion of the majority, we can of course change it. (We would not need an additional task thread if we use a future, right?) In order to see possible exceptions, I just added an additional try-catch around `SourceFunction.stop()` to log them. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-139194317 > The cancel only sets an internal variable. That is probably true for all sources that Flink provides. But the interface for sources is a public API, and trusting that users are implementing this properly is dangerous. It can basically lock the entire Flink cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-138569872 The `cancel` in the SourceFunction interface? As I said, I think it should be non-blocking. The cancel only sets an internal variable. The source must then respond by exiting from it's main loop and doing any cleanup in close or after the main-loop in `invoke`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-138587744 Yes. That is what I assumed writing the code for this PR. The question is, if you still simply assume that `cancel` behaves this way or try to enforce it somehow. Furthermore, the assumption about non-blocking behavior is not well documented. Should we at least update the JavaDoc for `cancel`? This question is blocking this PR to get merged. So we need to get to a conclusion if there is an issue or not (and if I need to change my code or this PR can just get merged). I personally would only update the JavaDoc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-138651481 I would be in favor of documenting it in the JavaDoc. What do you think @rmetzger as the one who probably knows the Kafka Source best. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38856545 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- If you put "-m yarn-cluster", it will deploy a YARN cluster from the client. The "-m yarn cluster" option should only be available to the "run" command. For stop, users should usually not specify the JobManager address, because - in standalone mode, the JM address is in the conf - in YARN Mode, the JM address is in a special `.yarn-properties` file. Only if that is not working, users can use the "-m ". --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38859306 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- I understand. Makes sense. I just c from other commands (and was wounding about it anyway ;) -- maybe I should have asked right away). The option `-m` in also specified for `cancel`, `list`. I guess it requires a complete clean up. Should I just include this in this PR or should be open a JIRA (and I just leave this inconsistency as is here)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on the pull request: https://github.com/apache/flink/pull/750#issuecomment-138278500 Any input on the blocking/non-blocking question about `cancel` ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38863884 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- :+1: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38863585 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- I'll write it down on my TODO list and fix it with the next YARN-related pull request (I'm planning to clean up the whole code a bit anyways) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38635830 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -411,6 +411,23 @@ class TaskManager( log.debug(s"Cannot find task to fail for execution ${executionID})") } +// stops a task +case StopTask(executionID) => + val task = runningTasks.get(executionID) + if (task != null) { +try { + task.stopExecution() --- End diff -- So what should the next step be? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38508028 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -411,6 +411,23 @@ class TaskManager( log.debug(s"Cannot find task to fail for execution ${executionID})") } +// stops a task +case StopTask(executionID) => + val task = runningTasks.get(executionID) + if (task != null) { +try { + task.stopExecution() --- End diff -- About Kafka, I think the approach would be to have `cancel()` signal to the source that it should close itself, then have the actual shutdown/cleanup in `close()` of RichFunction. What do you think about this @rmetzger ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38507948 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -411,6 +411,23 @@ class TaskManager( log.debug(s"Cannot find task to fail for execution ${executionID})") } +// stops a task +case StopTask(executionID) => + val task = runningTasks.get(executionID) + if (task != null) { +try { + task.stopExecution() --- End diff -- I agree with @mjsax that the cancel should be non-blocking. The method seems to be badly named, it should maybe have been named `stop()` from the beginning. Then it would be more obvious that is is just used for shutting down the source, no matter what the reason might be. Unfortunately `close()` is also taken as part of `RichFunction`. I think we have two options, keep `cancel` and use it for stopping, no matter what the reason is. Or rename it to `stop()` and use it in the same way to make it clearer what it does. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38511039 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -411,6 +411,23 @@ class TaskManager( log.debug(s"Cannot find task to fail for execution ${executionID})") } +// stops a task +case StopTask(executionID) => + val task = runningTasks.get(executionID) + if (task != null) { +try { + task.stopExecution() --- End diff -- I have to admit that this is the first time I'm looking into this pull request ;) Regarding the whole blocking / non-blocking discussion: I think even if we would write into the javadocs that the cancel()/stop() call has to be implemented in a non-blocking fashion, there would be still users who do it wrong! Its just too risky to block the entire actor system by erroneous user code. (Aren't our windowing threads doing some magic in the close methods as well?!) I'm think that the canceling of Tasks in the task manager is also done using separate cancelling threads? For the Kafka source: I believe we can move the "fetcher.close()" and offsetHandler.close() into the close() method as well. (We would probably need to add cancel() method to the Fetcher interface). But I would not touch the Kafka consumer and make the stop() mechanism more robust. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38511106 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- what happens when a user enters "stop -m yarn-cluster" ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38515501 --- Diff: docs/apis/cli.md --- @@ -185,6 +189,18 @@ Action "list" lists running and scheduled programs. -s,--scheduledShow only scheduled programs and their JobIDs +Action "stop" stops a running program (streaming jobs only). + + Syntax: stop [OPTIONS] + "stop" action options: + -m,--jobmanagerAddress of the JobManager (master) to which + to connect. Specify 'yarn-cluster' as the + JobManager to deploy a YARN cluster for the + job. Use this flag to connect to a different --- End diff -- I never tried it in a yarn cluster. Do you think it needs special handling? Basically, the "stop" signal is sent to the JobManager which forward the signal to all sources. This should be independent if yarn is involved or not. But I don't have any experience with yarn. Tell me if I am wrong. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38516042 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -411,6 +411,23 @@ class TaskManager( log.debug(s"Cannot find task to fail for execution ${executionID})") } +// stops a task +case StopTask(executionID) => + val task = runningTasks.get(executionID) + if (task != null) { +try { + task.stopExecution() --- End diff -- Would it be possible, to change the interface to an abstract class that provides a protected member `isRunning` (or similar) and implements `public FINAL void stop()` method by setting `isRunning = false`. Thus, we would ensure, that the call is non-blocking. The user needs to check `isRunning` flag in `run()` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add "stop" signal to cleanly shut...
Github user mjsax commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38458556 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -411,6 +411,23 @@ class TaskManager( log.debug(s"Cannot find task to fail for execution ${executionID})") } +// stops a task +case StopTask(executionID) => + val task = runningTasks.get(executionID) + if (task != null) { +try { + task.stopExecution() --- End diff -- @aljoscha do you have any input on this? I am not sure what the next step should be here? @tillrohrmann do you have any other comments on the whole PR? I would like to finish this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38270665 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -411,6 +411,23 @@ class TaskManager( log.debug(sCannot find task to fail for execution ${executionID})) } +// stops a task +case StopTask(executionID) = + val task = runningTasks.get(executionID) + if (task != null) { +try { + task.stopExecution() --- End diff -- I assume that the stops should be idempotent. But I agree if we document and check that all `cancel` calls are non-blocking, then it should work as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38179569 --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/test/java/org/apache/flink/stormcompatibility/api/FlinkTestCluster.java --- @@ -25,15 +25,21 @@ import backtype.storm.generated.StormTopology; import backtype.storm.generated.SubmitOptions; import backtype.storm.generated.TopologyInfo; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.util.TestStreamEnvironment; +import java.util.HashMap; import java.util.Map; /** * {@link FlinkTestCluster} mimics a Storm {@link LocalCluster} for ITCases via a {@link TestStreamEnvironment}. */ public class FlinkTestCluster extends FlinkLocalCluster { + // TODO remove and look up job --- End diff -- If you create a JIRA issue for this, then I think that you can remove the TODO. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38179508 --- Diff: flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/StormWordCountLocal.java --- @@ -67,8 +67,10 @@ public static void main(final String[] args) throws Exception { Utils.sleep(10 * 1000); - // TODO kill does no do anything so far cluster.killTopology(topologyId); + // killing sends STOP signal, takes some time to clean up + Utils.sleep(1000); --- End diff -- Hmm what is the advantage to conserve this behaviour compared to not conserving it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38186444 --- Diff: flink-clients/src/test/java/org/apache/flink/client/CliFrontendInfoTest.java --- @@ -107,28 +89,28 @@ public InfoTestCliFrontend(int expectedDop) throws Exception { @Override protected Client getClient(CommandLineOptions options, ClassLoader loader, String programName, int par) throws Exception - { + { return new TestClient(expectedDop); - } + } } - + private static final class TestClient extends Client { - + private final int expectedDop; - + private TestClient(int expectedDop) throws Exception { super(new InetSocketAddress(InetAddress.getLocalHost(), 6176), new Configuration(), CliFrontendInfoTest.class.getClassLoader(), -1); - + this.expectedDop = expectedDop; } - + @Override public String getOptimizedPlanAsJson(PackagedProgram prog, int parallelism) throws CompilerException, ProgramInvocationException - { + { --- End diff -- Same here with the indentation. But might also be a Github rendering problem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38179870 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala --- @@ -411,6 +411,23 @@ class TaskManager( log.debug(sCannot find task to fail for execution ${executionID})) } +// stops a task +case StopTask(executionID) = + val task = runningTasks.get(executionID) + if (task != null) { +try { + task.stopExecution() --- End diff -- Hmm, most sources ... does not look like a hard contract to me. I agree with you that we should state this more clearly in the description. What we could also do is to execute the `stop` function in a `Future`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2111] Add stop signal to cleanly shut...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/750#discussion_r38186259 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -588,15 +653,16 @@ protected int cancel(String[] args) { ActorGateway jobManager = getJobManagerGateway(options); FutureObject response = jobManager.ask(new CancelJob(jobId), askTimeout); - try { - Await.result(response, askTimeout); - return 0; - } - catch (Exception e) { - throw new Exception(Canceling the job with ID + jobId + failed., e); + Object rc = Await.result(response, askTimeout); + + if (rc instanceof CancellationFailure) { --- End diff -- Maybe we can do the same with the `CancellationFailure` as with the `StopFailure`, because I haven't found a place where we use the `jobID` field of the `CancellationFailure` instance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---