[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384536#comment-16384536
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5622


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0, 1.6.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1638#comment-1638
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5565
  
@GJL OK, That's all right. Now I close this PR.


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16384445#comment-16384445
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/5565


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383760#comment-16383760
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171888774
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
 ---
@@ -36,15 +36,26 @@
@Nullable
private final String targetDirectory;
 
+   private final boolean cancelJob;
--- End diff --

fixed


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383745#comment-16383745
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171886987
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBody.java
 ---
@@ -36,15 +36,26 @@
@Nullable
private final String targetDirectory;
 
+   private final boolean cancelJob;
--- End diff --

Not annotated with `@JsonProperty`


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383628#comment-16383628
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on the issue:

https://github.com/apache/flink/pull/5565
  
@yanghua Thanks for your contribution but I am afraid to tell you that we 
have changed our mind and decided that it is better not to allow checkpoints 
between the savepoint and job cancelation.

#5622 already implements this.

cc: @tillrohrmann 




> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383624#comment-16383624
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858873
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.isOneOf;
+
+/**
+ * Tests for {@link 
org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, 
Time)}.
+ *
+ * @see org.apache.flink.runtime.jobmaster.JobMaster
+ */
+@Category(Flip6.class)
+public class JobMasterTriggerSavepointIT extends AbstractTestBase {
+
+   private static CountDownLatch invokeLatch;
+
+   private static volatile CountDownLatch triggerCheckpointLatch;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private Path savepointDirectory;
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+   triggerCheckpointLatch = new CountDownLatch(1);
+   savepointDirectory = temporaryFolder.newFolder().toPath();
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   miniClusterResource.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
miniClusterResource.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex")

[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383622#comment-16383622
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858593
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.isOneOf;
+
+/**
+ * Tests for {@link 
org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, 
Time)}.
+ *
+ * @see org.apache.flink.runtime.jobmaster.JobMaster
+ */
+@Category(Flip6.class)
+public class JobMasterTriggerSavepointIT extends AbstractTestBase {
+
+   private static CountDownLatch invokeLatch;
+
+   private static volatile CountDownLatch triggerCheckpointLatch;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private Path savepointDirectory;
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+   triggerCheckpointLatch = new CountDownLatch(1);
+   savepointDirectory = temporaryFolder.newFolder().toPath();
+
+   Assume.assumeTrue(
+   "ClusterClient is not an instance of MiniClusterClient",
+   miniClusterResource.getClusterClient() instanceof 
MiniClusterClient);
+
+   clusterClient = (MiniClusterClient) 
miniClusterResource.getClusterClient();
+   clusterClient.setDetached(true);
+
+   jobGraph = new JobGraph();
+
+   final JobVertex vertex = new JobVertex("testVertex")

[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383623#comment-16383623
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858662
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
 ---
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.MiniClusterClient;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.checkpoint.CheckpointTriggerException;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.testutils.category.Flip6;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.isOneOf;
+
+/**
+ * Tests for {@link 
org.apache.flink.runtime.jobmaster.JobMaster#triggerSavepoint(String, boolean, 
Time)}.
+ *
+ * @see org.apache.flink.runtime.jobmaster.JobMaster
+ */
+@Category(Flip6.class)
+public class JobMasterTriggerSavepointIT extends AbstractTestBase {
+
+   private static CountDownLatch invokeLatch;
+
+   private static volatile CountDownLatch triggerCheckpointLatch;
+
+   @Rule
+   public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+   private Path savepointDirectory;
+   private MiniClusterClient clusterClient;
+   private JobGraph jobGraph;
+
+   @Before
+   public void setUp() throws Exception {
+   invokeLatch = new CountDownLatch(1);
+   triggerCheckpointLatch = new CountDownLatch(1);
+   savepointDirectory = temporaryFolder.newFolder().toPath();
+
+   Assume.assumeTrue(
--- End diff --

shouldn't happen if category is `flip6`


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
> 

[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383619#comment-16383619
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171858158
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointTriggerRequestBodyTest.java
 ---
@@ -34,8 +34,8 @@
}
 
@Override
-   protected SavepointTriggerRequestBody getTestRequestInstance() throws 
Exception {
-   return new SavepointTriggerRequestBody("/tmp");
+   protected SavepointTriggerRequestBody getTestRequestInstance() {
+   return new SavepointTriggerRequestBody("/tmp", true);
--- End diff --

strictly speaking the `false` case should be tested as well


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383618#comment-16383618
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857970
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
+   }
+
+   if (cancelJob) {
+   checkpointCoordinator.stopCheckpointScheduler();
+   }
+   return checkpointCoordinator
+   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+   .thenApply(CompletedCheckpoint::getExternalPointer)
+   .thenApplyAsync(path -> {
+   if (cancelJob) {
+   log.info("Savepoint stored in {}. Now 
cancelling {}.", path, jobGraph.getJobID());
+   cancel(timeout);
+   }
+   return path;
+   }, getMainThreadExecutor())
+   .exceptionally(throwable -> {
+   if (cancelJob) {
+   
startCheckpointScheduler(checkpointCoordinator);
+   }
+   throw new CompletionException(throwable);
+   });
+   }
+
+   private void startCheckpointScheduler(final CheckpointCoordinator 
checkpointCoordinator) {
--- End diff --

Method can be reused in the job rescaling logic.


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383616#comment-16383616
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857517
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
+   }
+
+   if (cancelJob) {
+   checkpointCoordinator.stopCheckpointScheduler();
+   }
+   return checkpointCoordinator
+   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
+   .thenApply(CompletedCheckpoint::getExternalPointer)
+   .thenApplyAsync(path -> {
+   if (cancelJob) {
+   log.info("Savepoint stored in {}. Now 
cancelling {}.", path, jobGraph.getJobID());
+   cancel(timeout);
+   }
+   return path;
+   }, getMainThreadExecutor())
+   .exceptionally(throwable -> {
+   if (cancelJob) {
+   
startCheckpointScheduler(checkpointCoordinator);
--- End diff --

If the cancelation failed, we restart the scheduler as well. I think this 
differs from the previous implementation.


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383612#comment-16383612
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5622#discussion_r171857387
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final 
ResourceID resourceID) {
 
@Override
public CompletableFuture triggerSavepoint(
-   @Nullable final String targetDirectory,
-   final Time timeout) {
-   try {
-   return executionGraph.getCheckpointCoordinator()
-   .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
-   
.thenApply(CompletedCheckpoint::getExternalPointer);
-   } catch (Exception e) {
-   return FutureUtils.completedExceptionally(e);
+   @Nullable final String targetDirectory,
+   final boolean cancelJob,
+   final Time timeout) {
+
+   final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+   if (checkpointCoordinator == null) {
+   return FutureUtils.completedExceptionally(new 
IllegalStateException(
+   String.format("Job %s is not a streaming job.", 
jobGraph.getJobID(;
--- End diff --

If the job is in a terminal state, the coordinator will be `null`ed as well.


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-03-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16383610#comment-16383610
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5622

[FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint

## What is the purpose of the change

*Introduce cancelJob flag to existing triggerSavepoint methods in 
Dispatcher and
JobMaster. Stop checkpoint scheduler before taking savepoint to make sure 
that
the savepoint created by this command is the last one.*

cc: @tillrohrmann 

## Brief change log

  - *Implement RestClusterClient.cancelWithSavepoint*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added `JobMasterTriggerSavepointIT`.*
  - *Manually tested.*
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8459-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5622.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5622


commit 7e913b0d1eab8453279ffacc11f4633b9263190d
Author: gyao 
Date:   2018-03-02T14:11:36Z

[FLINK-8459][flip6] Implement RestClusterClient.cancelWithSavepoint

Introduce cancelJob flag to existing triggerSavepoint methods in Dispatcher 
and
JobMaster. Stop checkpoint scheduler before taking savepoint to make sure 
that
the savepoint created by this command is the last one.




> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374085#comment-16374085
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5565
  
Hi @tillrohrmann would you please review this PR, Thanks.


> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16374012#comment-16374012
 ] 

ASF GitHub Bot commented on FLINK-8459:
---

GitHub user yanghua opened a pull request:

https://github.com/apache/flink/pull/5565

[FLINK-8459][flip6] Implement cancelWithSavepoint in RestClusterClient

## What is the purpose of the change

This pull request implements cancelWithSavepoint in RestClusterClient for 
flip6.


## Brief change log

  - split the function into two steps: send REST call `triggerSavepoint` 
and send REST call `cancel`

## Verifying this change

This change added tests and can be verified as follows:

  - Added a test to check savepoint path and job's termination state is 
canceled.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yanghua/flink FLINK-8459

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5565.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5565


commit a4b97d8f305bed09c154fb88ba8e06879532e1ed
Author: vinoyang 
Date:   2018-02-23T06:52:11Z

[FLINK-8459][flip6] Implement cancelWithSavepoint in RestClusterClient




> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-14 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16364044#comment-16364044
 ] 

Till Rohrmann commented on FLINK-8459:
--

Yes the better solution in terms of guarantees would be issuing a single call 
and then execute the logic on the {{JobMaster}}. However, this won't strictly 
give you exactly once processing guarantees, because we don't support yet a 
proper cancel with savepoint command on the {{JobMaster}}. After the savepoint 
the {{Tasks}} might still process some other records before cancelling it. 
Therefore, both variants, doing the cancel with savepoint command with a single 
REST call or splitting them into the two existing REST calls 
{{triggerSavepoint}} and {{cancel}} would be ok for me.

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-12 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360758#comment-16360758
 ] 

vinoyang commented on FLINK-8459:
-

Hi [~till.rohrmann] , my idea is like yours in a way, refer to the JobManager 
handle CancelJobWithSavepoint message in old version (such as v1.3.2). The 
request contains trigger savepoint and cancel job two actions and the eventual 
cancel should happen after the savepoint invoke successfully.

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-12 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360573#comment-16360573
 ] 

Till Rohrmann commented on FLINK-8459:
--

Hi [~yanghua], thanks for working on this issue? How do you wanna solve it? We 
have thought about a simple solution where the cancel with savepoint would 
effectively consist of two calls from the client. One for taking the savepoint 
and the other one for cancelling the job.

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-11 Thread vinoyang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16360352#comment-16360352
 ] 

vinoyang commented on FLINK-8459:
-

Hi [~gjy] , are you handling this issue? If not , can I assign to myself?

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)