[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384536#comment-16384536 ] ASF GitHub Bot commented on FLINK-8459: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5622 > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0, 1.6.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1638#comment-1638 ] ASF GitHub Bot commented on FLINK-8459: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5565 @GJL OK, That's all right. Now I close this PR. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384445#comment-16384445 ] ASF GitHub Bot commented on FLINK-8459: --- Github user yanghua closed the pull request at: https://github.com/apache/flink/pull/5565 > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383760#comment-16383760 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171888774 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java --- @@ -36,15 +36,26 @@ @Nullable private final String targetDirectory; + private final boolean cancelJob; --- End diff -- fixed > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383745#comment-16383745 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171886987 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java --- @@ -36,15 +36,26 @@ @Nullable private final String targetDirectory; + private final boolean cancelJob; --- End diff -- Not annotated with `@JsonProperty` > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383628#comment-16383628 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on the issue: https://github.com/apache/flink/pull/5565 @yanghua Thanks for your contribution but I am afraid to tell you that we have changed our mind and decided that it is better not to allow checkpoints between the savepoint and job cancelation. #5622 already implements this. cc: @tillrohrmann > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383624#comment-16383624 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858873 --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; + +/** + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}. + * + * @see org.apache.flink.runtime.jobmaster.JobMaster + */ +@Category(Flip6.class) +public class JobMasterTriggerSavepointIT extends AbstractTestBase { + + private static CountDownLatch invokeLatch; + + private static volatile CountDownLatch triggerCheckpointLatch; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path savepointDirectory; + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + triggerCheckpointLatch = new CountDownLatch(1); + savepointDirectory = temporaryFolder.newFolder().toPath(); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + miniClusterResource.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) miniClusterResource.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex")
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383622#comment-16383622 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858593 --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; + +/** + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}. + * + * @see org.apache.flink.runtime.jobmaster.JobMaster + */ +@Category(Flip6.class) +public class JobMasterTriggerSavepointIT extends AbstractTestBase { + + private static CountDownLatch invokeLatch; + + private static volatile CountDownLatch triggerCheckpointLatch; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path savepointDirectory; + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + triggerCheckpointLatch = new CountDownLatch(1); + savepointDirectory = temporaryFolder.newFolder().toPath(); + + Assume.assumeTrue( + "ClusterClient is not an instance of MiniClusterClient", + miniClusterResource.getClusterClient() instanceof MiniClusterClient); + + clusterClient = (MiniClusterClient) miniClusterResource.getClusterClient(); + clusterClient.setDetached(true); + + jobGraph = new JobGraph(); + + final JobVertex vertex = new JobVertex("testVertex")
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383623#comment-16383623 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858662 --- Diff: flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java --- @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.MiniClusterClient; +import org.apache.flink.runtime.checkpoint.CheckpointMetaData; +import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy; +import org.apache.flink.runtime.checkpoint.CheckpointTriggerException; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; +import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.testutils.category.Flip6; +import org.apache.flink.util.ExceptionUtils; + +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.isOneOf; + +/** + * Tests for {@link org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, Time)}. + * + * @see org.apache.flink.runtime.jobmaster.JobMaster + */ +@Category(Flip6.class) +public class JobMasterTriggerSavepointIT extends AbstractTestBase { + + private static CountDownLatch invokeLatch; + + private static volatile CountDownLatch triggerCheckpointLatch; + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private Path savepointDirectory; + private MiniClusterClient clusterClient; + private JobGraph jobGraph; + + @Before + public void setUp() throws Exception { + invokeLatch = new CountDownLatch(1); + triggerCheckpointLatch = new CountDownLatch(1); + savepointDirectory = temporaryFolder.newFolder().toPath(); + + Assume.assumeTrue( --- End diff -- shouldn't happen if category is `flip6` > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383619#comment-16383619 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171858158 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java --- @@ -34,8 +34,8 @@ } @Override - protected SavepointTriggerRequestBody getTestRequestInstance() throws Exception { - return new SavepointTriggerRequestBody("/tmp"); + protected SavepointTriggerRequestBody getTestRequestInstance() { + return new SavepointTriggerRequestBody("/tmp", true); --- End diff -- strictly speaking the `false` case should be tested as well > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383618#comment-16383618 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857970 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; + } + + if (cancelJob) { + checkpointCoordinator.stopCheckpointScheduler(); + } + return checkpointCoordinator + .triggerSavepoint(System.currentTimeMillis(), targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer) + .thenApplyAsync(path -> { + if (cancelJob) { + log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID()); + cancel(timeout); + } + return path; + }, getMainThreadExecutor()) + .exceptionally(throwable -> { + if (cancelJob) { + startCheckpointScheduler(checkpointCoordinator); + } + throw new CompletionException(throwable); + }); + } + + private void startCheckpointScheduler(final CheckpointCoordinator checkpointCoordinator) { --- End diff -- Method can be reused in the job rescaling logic. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383616#comment-16383616 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857517 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; + } + + if (cancelJob) { + checkpointCoordinator.stopCheckpointScheduler(); + } + return checkpointCoordinator + .triggerSavepoint(System.currentTimeMillis(), targetDirectory) + .thenApply(CompletedCheckpoint::getExternalPointer) + .thenApplyAsync(path -> { + if (cancelJob) { + log.info("Savepoint stored in {}. Now cancelling {}.", path, jobGraph.getJobID()); + cancel(timeout); + } + return path; + }, getMainThreadExecutor()) + .exceptionally(throwable -> { + if (cancelJob) { + startCheckpointScheduler(checkpointCoordinator); --- End diff -- If the cancelation failed, we restart the scheduler as well. I think this differs from the previous implementation. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383612#comment-16383612 ] ASF GitHub Bot commented on FLINK-8459: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID(; --- End diff -- If the job is in a terminal state, the coordinator will be `null`ed as well. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383610#comment-16383610 ] ASF GitHub Bot commented on FLINK-8459: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5622 [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint ## What is the purpose of the change *Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that the savepoint created by this command is the last one.* cc: @tillrohrmann ## Brief change log - *Implement RestClusterClient.cancelWithSavepoint* ## Verifying this change This change added tests and can be verified as follows: - *Added `JobMasterTriggerSavepointIT`.* - *Manually tested.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8459-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5622.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5622 commit 7e913b0d1eab8453279ffacc11f4633b9263190d Author: gyao Date: 2018-03-02T14:11:36Z [FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher and JobMaster. Stop checkpoint scheduler before taking savepoint to make sure that the savepoint created by this command is the last one. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374085#comment-16374085 ] ASF GitHub Bot commented on FLINK-8459: --- Github user yanghua commented on the issue: https://github.com/apache/flink/pull/5565 Hi @tillrohrmann would you please review this PR, Thanks. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374012#comment-16374012 ] ASF GitHub Bot commented on FLINK-8459: --- GitHub user yanghua opened a pull request: https://github.com/apache/flink/pull/5565 [FLINK-8459][flip6] Implement cancelWithSavepoint in RestClusterClient ## What is the purpose of the change This pull request implements cancelWithSavepoint in RestClusterClient for flip6. ## Brief change log - split the function into two steps: send REST call `triggerSavepoint` and send REST call `cancel` ## Verifying this change This change added tests and can be verified as follows: - Added a test to check savepoint path and job's termination state is canceled. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/yanghua/flink FLINK-8459 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5565.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5565 commit a4b97d8f305bed09c154fb88ba8e06879532e1ed Author: vinoyang Date: 2018-02-23T06:52:11Z [FLINK-8459][flip6] Implement cancelWithSavepoint in RestClusterClient > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364044#comment-16364044 ] Till Rohrmann commented on FLINK-8459: -- Yes the better solution in terms of guarantees would be issuing a single call and then execute the logic on the {{JobMaster}}. However, this won't strictly give you exactly once processing guarantees, because we don't support yet a proper cancel with savepoint command on the {{JobMaster}}. After the savepoint the {{Tasks}} might still process some other records before cancelling it. Therefore, both variants, doing the cancel with savepoint command with a single REST call or splitting them into the two existing REST calls {{triggerSavepoint}} and {{cancel}} would be ok for me. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360758#comment-16360758 ] vinoyang commented on FLINK-8459: - Hi [~till.rohrmann] , my idea is like yours in a way, refer to the JobManager handle CancelJobWithSavepoint message in old version (such as v1.3.2). The request contains trigger savepoint and cancel job two actions and the eventual cancel should happen after the savepoint invoke successfully. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360573#comment-16360573 ] Till Rohrmann commented on FLINK-8459: -- Hi [~yanghua], thanks for working on this issue? How do you wanna solve it? We have thought about a simple solution where the cancel with savepoint would effectively consist of two calls from the client. One for taking the savepoint and the other one for cancelling the job. > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: vinoyang >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient
[ https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360352#comment-16360352 ] vinoyang commented on FLINK-8459: - Hi [~gjy] , are you handling this issue? If not , can I assign to myself? > Implement cancelWithSavepoint in RestClusterClient > -- > > Key: FLINK-8459 > URL: https://issues.apache.org/jira/browse/FLINK-8459 > Project: Flink > Issue Type: Sub-task > Components: Client >Affects Versions: 1.5.0 >Reporter: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Implement the method > {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String > savepointDirectory)}}. > by either taking a savepoint and cancel the job separately, or by migrating > the logic in {{JobCancellationWithSavepointHandlers}}. The former will have > different semantics because the checkpoint scheduler is not stopped. Thus it > is not guaranteed that there won't be additional checkpoints between the > savepoint and the job cancelation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)