[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16325161#comment-16325161
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5223


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16325064#comment-16325064
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161368455
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * HTTP handlers for asynchronous triggering of savepoints.
+ *
+ * Drawing savepoints is a potentially long-running operation. To avoid 
blocking HTTP
+ * connections, savepoints must be drawn in two steps. First, an HTTP 
request is issued to trigger
+ * the savepoint asynchronously. The request will be assigned a {@link 
SavepointTriggerId},
+ * which is returned in the response body. Next, the returned id should be 
used to poll the status
+ * of the savepoint until it is finished.
+ *
+ * A savepoint is triggered by sending an HTTP {@code POST} request to
+ * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON 
body to specify the target
+ * directory of the savepoint, e.g.,
+ 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324084#comment-16324084
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161243072
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * HTTP handlers for asynchronous triggering of savepoints.
+ *
+ * Drawing savepoints is a potentially long-running operation. To avoid 
blocking HTTP
+ * connections, savepoints must be drawn in two steps. First, an HTTP 
request is issued to trigger
+ * the savepoint asynchronously. The request will be assigned a {@link 
SavepointTriggerId},
+ * which is returned in the response body. Next, the returned id should be 
used to poll the status
+ * of the savepoint until it is finished.
+ *
+ * A savepoint is triggered by sending an HTTP {@code POST} request to
+ * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON 
body to specify the target
+ * directory of the savepoint, 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324087#comment-16324087
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161243207
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -137,4 +138,21 @@
JobID jobId, @RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param targetDirectory Target directory for the savepoint.
+* @param timeout for the asynchronous operation
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+*/
+   default CompletableFuture triggerSavepoint(
--- End diff --

Cool :-)


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324083#comment-16324083
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161242967
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * HTTP handlers for asynchronous triggering of savepoints.
+ *
+ * Drawing savepoints is a potentially long-running operation. To avoid 
blocking HTTP
+ * connections, savepoints must be drawn in two steps. First, an HTTP 
request is issued to trigger
+ * the savepoint asynchronously. The request will be assigned a {@link 
SavepointTriggerId},
+ * which is returned in the response body. Next, the returned id should be 
used to poll the status
+ * of the savepoint until it is finished.
+ *
+ * A savepoint is triggered by sending an HTTP {@code POST} request to
+ * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON 
body to specify the target
+ * directory of the savepoint, 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324080#comment-16324080
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161242316
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link SavepointHandlers}.
+ */
+public class SavepointHandlersTest {
+
+   private static final CompletableFuture LOCAL_REST_ADDRESS =
+   CompletableFuture.completedFuture("localhost:12345");
+
+   private static final Time TIMEOUT = Time.seconds(10);
+
+   private static final JobID JOB_ID = new JobID();
+
+   private static final String COMPLETED_CHECKPOINT_EXTERNAL_POINTER = 
"/tmp/savepoint-0d2fb9-8d5e0106041a";
+
+   private static final String 
DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY = "/tmp";
+
+   @Mock
+   private RestfulGateway mockRestfulGateway;
+
+   private SavepointHandlers savepointHandlers;
+
+   private SavepointHandlers.SavepointTriggerHandler 
savepointTriggerHandler;
+
+   private SavepointHandlers.SavepointStatusHandler savepointStatusHandler;
+
+   private CompletedCheckpoint 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324076#comment-16324076
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161242236
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -294,44 +306,108 @@ public void testSubmitJobAndWaitForExecutionResult() 
throws Exception {
 
@Test
public void testTriggerSavepoint() throws Exception {
-   String targetSavepointDirectory = "/alternate";
-   TestSavepointTriggerHandler triggerHandler = new 
TestSavepointTriggerHandler(targetSavepointDirectory);
-   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(triggerHandler)) {
+   final String targetSavepointDirectory = "/tmp";
+   final TestSavepointHandlers testSavepointHandlers = new 
TestSavepointHandlers();
+   final TestSavepointHandlers.TestSavepointTriggerHandler 
triggerHandler =
+   testSavepointHandlers.new TestSavepointTriggerHandler(
+   Arrays.asList(null, targetSavepointDirectory, 
null).iterator());
+   final TestSavepointHandlers.TestSavepointHandler 
savepointHandler =
+   testSavepointHandlers.new 
TestSavepointHandler(Arrays.asList(
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   "/other/savepoint-0d2fb9-8d5e0106041a",
+   null)),
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   "/tmp/savepoint-0d2fb9-8d5e0106041a",
+   null)),
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   null,
+   new SerializedThrowable(new 
RuntimeException("expected").iterator());
+   try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+   triggerHandler,
+   savepointHandler)) {
+
JobID id = new JobID();
{
CompletableFuture savepointPathFuture = 
restClusterClient.triggerSavepoint(id, null);
String savepointPath = 
savepointPathFuture.get();
-   Assert.assertEquals("/universe", savepointPath);
+   
assertEquals("/other/savepoint-0d2fb9-8d5e0106041a", savepointPath);
}
 
{
CompletableFuture savepointPathFuture = 
restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
String savepointPath = 
savepointPathFuture.get();
-   Assert.assertEquals(targetSavepointDirectory + 
"/universe", savepointPath);
+   
assertEquals("/tmp/savepoint-0d2fb9-8d5e0106041a", savepointPath);
--- End diff --

done


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324079#comment-16324079
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161242312
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfoTest.java
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.util.SerializedThrowable;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link SavepointInfo}.
+ */
+public class SavepointInfoTest {
--- End diff --

fixed


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324081#comment-16324081
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161242333
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link SavepointHandlers}.
+ */
+public class SavepointHandlersTest {
--- End diff --

fixed


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324077#comment-16324077
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161242245
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -294,44 +306,108 @@ public void testSubmitJobAndWaitForExecutionResult() 
throws Exception {
 
@Test
public void testTriggerSavepoint() throws Exception {
-   String targetSavepointDirectory = "/alternate";
-   TestSavepointTriggerHandler triggerHandler = new 
TestSavepointTriggerHandler(targetSavepointDirectory);
-   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(triggerHandler)) {
+   final String targetSavepointDirectory = "/tmp";
+   final TestSavepointHandlers testSavepointHandlers = new 
TestSavepointHandlers();
+   final TestSavepointHandlers.TestSavepointTriggerHandler 
triggerHandler =
+   testSavepointHandlers.new TestSavepointTriggerHandler(
+   Arrays.asList(null, targetSavepointDirectory, 
null).iterator());
+   final TestSavepointHandlers.TestSavepointHandler 
savepointHandler =
+   testSavepointHandlers.new 
TestSavepointHandler(Arrays.asList(
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   "/other/savepoint-0d2fb9-8d5e0106041a",
+   null)),
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   "/tmp/savepoint-0d2fb9-8d5e0106041a",
+   null)),
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   null,
+   new SerializedThrowable(new 
RuntimeException("expected").iterator());
+   try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+   triggerHandler,
+   savepointHandler)) {
+
JobID id = new JobID();
{
CompletableFuture savepointPathFuture = 
restClusterClient.triggerSavepoint(id, null);
String savepointPath = 
savepointPathFuture.get();
-   Assert.assertEquals("/universe", savepointPath);
+   
assertEquals("/other/savepoint-0d2fb9-8d5e0106041a", savepointPath);
--- End diff --

done


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324078#comment-16324078
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161242267
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
-   private JobResult waitForJobExecutionResult(
-   final JobID jobId) throws ProgramInvocationException {
-
-   final JobMessageParameters messageParameters = new 
JobMessageParameters();
-   messageParameters.jobPathParameter.resolve(jobId);
-   JobExecutionResultResponseBody jobExecutionResultResponseBody;
-   try {
-   long attempt = 0;
-   do {
-   final 
CompletableFuture responseFuture =
-   restClient.sendRequest(
-   
restClusterClientConfiguration.getRestServerAddress(),
-   
restClusterClientConfiguration.getRestServerPort(),
-   
JobExecutionResultHeaders.getInstance(),
-   messageParameters);
-   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-   Thread.sleep(waitStrategy.sleepTime(attempt));
-   attempt++;
-   }
-   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
-   } catch (IOException | TimeoutException | ExecutionException e) 
{
-   throw new ProgramInvocationException(e);
-   } catch (InterruptedException e) {
-   Thread.currentThread().interrupt();
-   throw new ProgramInvocationException(e);
+   private  R 
waitForResource(
+   final SupplierWithException resourceFutureSupplier)
+   throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
+   T asynchronouslyCreatedResource;
+   long attempt = 0;
+   do {
+   final CompletableFuture responseFuture = 
resourceFutureSupplier.get();
+   asynchronouslyCreatedResource = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
--- End diff --

fixed


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324039#comment-16324039
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161238338
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * HTTP handlers for asynchronous triggering of savepoints.
+ *
+ * Drawing savepoints is a potentially long-running operation. To avoid 
blocking HTTP
+ * connections, savepoints must be drawn in two steps. First, an HTTP 
request is issued to trigger
+ * the savepoint asynchronously. The request will be assigned a {@link 
SavepointTriggerId},
+ * which is returned in the response body. Next, the returned id should be 
used to poll the status
+ * of the savepoint until it is finished.
+ *
+ * A savepoint is triggered by sending an HTTP {@code POST} request to
+ * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON 
body to specify the target
+ * directory of the savepoint, e.g.,
+ 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324032#comment-16324032
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161236268
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * HTTP handlers for asynchronous triggering of savepoints.
+ *
+ * Drawing savepoints is a potentially long-running operation. To avoid 
blocking HTTP
+ * connections, savepoints must be drawn in two steps. First, an HTTP 
request is issued to trigger
+ * the savepoint asynchronously. The request will be assigned a {@link 
SavepointTriggerId},
+ * which is returned in the response body. Next, the returned id should be 
used to poll the status
+ * of the savepoint until it is finished.
+ *
+ * A savepoint is triggered by sending an HTTP {@code POST} request to
+ * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON 
body to specify the target
+ * directory of the savepoint, e.g.,
+ 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324024#comment-16324024
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161235482
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -137,4 +138,21 @@
JobID jobId, @RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param targetDirectory Target directory for the savepoint.
+* @param timeout for the asynchronous operation
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+*/
+   default CompletableFuture triggerSavepoint(
--- End diff --

I can make it return the external pointer. 


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324022#comment-16324022
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161234791
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
-   private JobResult waitForJobExecutionResult(
-   final JobID jobId) throws ProgramInvocationException {
-
-   final JobMessageParameters messageParameters = new 
JobMessageParameters();
-   messageParameters.jobPathParameter.resolve(jobId);
-   JobExecutionResultResponseBody jobExecutionResultResponseBody;
-   try {
-   long attempt = 0;
-   do {
-   final 
CompletableFuture responseFuture =
-   restClient.sendRequest(
-   
restClusterClientConfiguration.getRestServerAddress(),
-   
restClusterClientConfiguration.getRestServerPort(),
-   
JobExecutionResultHeaders.getInstance(),
-   messageParameters);
-   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-   Thread.sleep(waitStrategy.sleepTime(attempt));
-   attempt++;
-   }
-   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
-   } catch (IOException | TimeoutException | ExecutionException e) 
{
-   throw new ProgramInvocationException(e);
-   } catch (InterruptedException e) {
-   Thread.currentThread().interrupt();
-   throw new ProgramInvocationException(e);
+   private  R 
waitForResource(
+   final SupplierWithException resourceFutureSupplier)
+   throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
+   T asynchronouslyCreatedResource;
+   long attempt = 0;
+   do {
+   final CompletableFuture responseFuture = 
resourceFutureSupplier.get();
+   asynchronouslyCreatedResource = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
--- End diff --

True


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324008#comment-16324008
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161228957
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link SavepointHandlers}.
+ */
+public class SavepointHandlersTest {
--- End diff --

Missing `extends TestLogger`


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323999#comment-16323999
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161220157
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -294,44 +306,108 @@ public void testSubmitJobAndWaitForExecutionResult() 
throws Exception {
 
@Test
public void testTriggerSavepoint() throws Exception {
-   String targetSavepointDirectory = "/alternate";
-   TestSavepointTriggerHandler triggerHandler = new 
TestSavepointTriggerHandler(targetSavepointDirectory);
-   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(triggerHandler)) {
+   final String targetSavepointDirectory = "/tmp";
+   final TestSavepointHandlers testSavepointHandlers = new 
TestSavepointHandlers();
+   final TestSavepointHandlers.TestSavepointTriggerHandler 
triggerHandler =
+   testSavepointHandlers.new TestSavepointTriggerHandler(
+   Arrays.asList(null, targetSavepointDirectory, 
null).iterator());
+   final TestSavepointHandlers.TestSavepointHandler 
savepointHandler =
+   testSavepointHandlers.new 
TestSavepointHandler(Arrays.asList(
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   "/other/savepoint-0d2fb9-8d5e0106041a",
+   null)),
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   "/tmp/savepoint-0d2fb9-8d5e0106041a",
+   null)),
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   null,
+   new SerializedThrowable(new 
RuntimeException("expected").iterator());
+   try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+   triggerHandler,
+   savepointHandler)) {
+
JobID id = new JobID();
{
CompletableFuture savepointPathFuture = 
restClusterClient.triggerSavepoint(id, null);
String savepointPath = 
savepointPathFuture.get();
-   Assert.assertEquals("/universe", savepointPath);
+   
assertEquals("/other/savepoint-0d2fb9-8d5e0106041a", savepointPath);
}
 
{
CompletableFuture savepointPathFuture = 
restClusterClient.triggerSavepoint(id, targetSavepointDirectory);
String savepointPath = 
savepointPathFuture.get();
-   Assert.assertEquals(targetSavepointDirectory + 
"/universe", savepointPath);
+   
assertEquals("/tmp/savepoint-0d2fb9-8d5e0106041a", savepointPath);
--- End diff --

Same here.


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324005#comment-16324005
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161220129
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 ---
@@ -294,44 +306,108 @@ public void testSubmitJobAndWaitForExecutionResult() 
throws Exception {
 
@Test
public void testTriggerSavepoint() throws Exception {
-   String targetSavepointDirectory = "/alternate";
-   TestSavepointTriggerHandler triggerHandler = new 
TestSavepointTriggerHandler(targetSavepointDirectory);
-   try (TestRestServerEndpoint ignored = 
createRestServerEndpoint(triggerHandler)) {
+   final String targetSavepointDirectory = "/tmp";
+   final TestSavepointHandlers testSavepointHandlers = new 
TestSavepointHandlers();
+   final TestSavepointHandlers.TestSavepointTriggerHandler 
triggerHandler =
+   testSavepointHandlers.new TestSavepointTriggerHandler(
+   Arrays.asList(null, targetSavepointDirectory, 
null).iterator());
+   final TestSavepointHandlers.TestSavepointHandler 
savepointHandler =
+   testSavepointHandlers.new 
TestSavepointHandler(Arrays.asList(
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   "/other/savepoint-0d2fb9-8d5e0106041a",
+   null)),
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   "/tmp/savepoint-0d2fb9-8d5e0106041a",
+   null)),
+   new 
SavepointResponseBody(QueueStatus.completed(), new SavepointInfo(
+   
testSavepointHandlers.testSavepointTriggerId,
+   null,
+   new SerializedThrowable(new 
RuntimeException("expected").iterator());
+   try (TestRestServerEndpoint ignored = createRestServerEndpoint(
+   triggerHandler,
+   savepointHandler)) {
+
JobID id = new JobID();
{
CompletableFuture savepointPathFuture = 
restClusterClient.triggerSavepoint(id, null);
String savepointPath = 
savepointPathFuture.get();
-   Assert.assertEquals("/universe", savepointPath);
+   
assertEquals("/other/savepoint-0d2fb9-8d5e0106041a", savepointPath);
--- End diff --

Maybe we could declare a variable with this value. Makes it easier to 
refactor later on.


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324002#comment-16324002
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161228758
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java ---
@@ -27,11 +27,16 @@
 import java.util.concurrent.TimeoutException;
 
 /**
- * Utility functions for Flink's RPC implementation
+ * Utility functions for Flink's RPC implementation.
  */
 public class RpcUtils {
 
-   public static final Time INF_TIMEOUT = 
Time.milliseconds(Long.MAX_VALUE);
+   /**
+* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 
2.4.20). The value cannot be
+* higher or an {@link IllegalArgumentException} will be thrown during 
an RPC. Check the private
+* method {@code checkMaxDelay()} in {@link 
akka.actor.LightArrayRevolverScheduler}.
--- End diff --

Good catch :-)


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324003#comment-16324003
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161224329
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * HTTP handlers for asynchronous triggering of savepoints.
+ *
+ * Drawing savepoints is a potentially long-running operation. To avoid 
blocking HTTP
+ * connections, savepoints must be drawn in two steps. First, an HTTP 
request is issued to trigger
+ * the savepoint asynchronously. The request will be assigned a {@link 
SavepointTriggerId},
+ * which is returned in the response body. Next, the returned id should be 
used to poll the status
+ * of the savepoint until it is finished.
+ *
+ * A savepoint is triggered by sending an HTTP {@code POST} request to
+ * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON 
body to specify the target
+ * directory of the savepoint, 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324004#comment-16324004
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161224559
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java
 ---
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.types.Either;
+import org.apache.flink.util.SerializedThrowable;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import 
org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.Immutable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * HTTP handlers for asynchronous triggering of savepoints.
+ *
+ * Drawing savepoints is a potentially long-running operation. To avoid 
blocking HTTP
+ * connections, savepoints must be drawn in two steps. First, an HTTP 
request is issued to trigger
+ * the savepoint asynchronously. The request will be assigned a {@link 
SavepointTriggerId},
+ * which is returned in the response body. Next, the returned id should be 
used to poll the status
+ * of the savepoint until it is finished.
+ *
+ * A savepoint is triggered by sending an HTTP {@code POST} request to
+ * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON 
body to specify the target
+ * directory of the savepoint, 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324007#comment-16324007
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161230839
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 ---
@@ -137,4 +138,21 @@
JobID jobId, @RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
+
+   /**
+* Triggers a savepoint with the given savepoint directory as a target.
+*
+* @param targetDirectory Target directory for the savepoint.
+* @param timeout for the asynchronous operation
+* @return A future to the completed checkpoint
+* @throws IllegalStateException If no savepoint directory has been
+*   specified and no default savepoint 
directory has been
+*   configured
+*/
+   default CompletableFuture triggerSavepoint(
--- End diff --

I'm wondering whether we actually need to expose the `CompletedCheckpoint`. 
Wouldn't it be enough to return the external path here? That way we would avoid 
further coupling between external components and internal data structures.


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324001#comment-16324001
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161229580
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
 ---
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link SavepointHandlers}.
+ */
+public class SavepointHandlersTest {
+
+   private static final CompletableFuture LOCAL_REST_ADDRESS =
+   CompletableFuture.completedFuture("localhost:12345");
+
+   private static final Time TIMEOUT = Time.seconds(10);
+
+   private static final JobID JOB_ID = new JobID();
+
+   private static final String COMPLETED_CHECKPOINT_EXTERNAL_POINTER = 
"/tmp/savepoint-0d2fb9-8d5e0106041a";
+
+   private static final String 
DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY = "/tmp";
+
+   @Mock
+   private RestfulGateway mockRestfulGateway;
+
+   private SavepointHandlers savepointHandlers;
+
+   private SavepointHandlers.SavepointTriggerHandler 
savepointTriggerHandler;
+
+   private SavepointHandlers.SavepointStatusHandler savepointStatusHandler;
+
+   private CompletedCheckpoint 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324006#comment-16324006
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161230366
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfoTest.java
 ---
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages.job.savepoints;
+
+import org.apache.flink.util.SerializedThrowable;
+
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link SavepointInfo}.
+ */
+public class SavepointInfoTest {
--- End diff --

`extends TestLogger`


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324000#comment-16324000
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161218205
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 ---
@@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws 
JobSubmissionException {
}
}
 
-   private JobResult waitForJobExecutionResult(
-   final JobID jobId) throws ProgramInvocationException {
-
-   final JobMessageParameters messageParameters = new 
JobMessageParameters();
-   messageParameters.jobPathParameter.resolve(jobId);
-   JobExecutionResultResponseBody jobExecutionResultResponseBody;
-   try {
-   long attempt = 0;
-   do {
-   final 
CompletableFuture responseFuture =
-   restClient.sendRequest(
-   
restClusterClientConfiguration.getRestServerAddress(),
-   
restClusterClientConfiguration.getRestServerPort(),
-   
JobExecutionResultHeaders.getInstance(),
-   messageParameters);
-   jobExecutionResultResponseBody = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
-   Thread.sleep(waitStrategy.sleepTime(attempt));
-   attempt++;
-   }
-   while 
(jobExecutionResultResponseBody.getStatus().getStatusId() != 
QueueStatus.StatusId.COMPLETED);
-   } catch (IOException | TimeoutException | ExecutionException e) 
{
-   throw new ProgramInvocationException(e);
-   } catch (InterruptedException e) {
-   Thread.currentThread().interrupt();
-   throw new ProgramInvocationException(e);
+   private  R 
waitForResource(
+   final SupplierWithException resourceFutureSupplier)
+   throws IOException, InterruptedException, 
ExecutionException, TimeoutException {
+   T asynchronouslyCreatedResource;
+   long attempt = 0;
+   do {
+   final CompletableFuture responseFuture = 
resourceFutureSupplier.get();
+   asynchronouslyCreatedResource = 
responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
+   Thread.sleep(waitStrategy.sleepTime(attempt));
--- End diff --

I think that we don't have to wait if 
`asynchronouslyCreatedResource.queueStatus().getId() == 
QueueStatus.Id.COMPLETED`


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323700#comment-16323700
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161163563
  
--- Diff: 
flink-clients/src/test/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategyTest.java
 ---
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program.rest.retry;
+
+import org.junit.Test;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for {@link ExponentialWaitStrategy}.
+ */
+public class ExponentialWaitStrategyTest {
--- End diff --

done


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323699#comment-16323699
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161163558
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link SavepointHandlers}.
+ */
+public class SavepointHandlersTest {
+
+   private static final CompletableFuture LOCAL_REST_ADDRESS =
+   CompletableFuture.completedFuture("localhost:12345");
+
+   private static final Time TIMEOUT = Time.seconds(10);
+
+   private static final JobID JOB_ID = new JobID();
+
+   private static final String COMPLETED_CHECKPOINT_EXTERNAL_POINTER = 
"/tmp/savepoint-0d2fb9-8d5e0106041a";
+
+   private static final String 
DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY = "/tmp";
+
+   @Mock
+   private RestfulGateway mockRestfulGateway;
+
+   private SavepointHandlers savepointHandlers;
+
+   private SavepointHandlers.SavepointTriggerHandler 
savepointTriggerHandler;
+
+   private SavepointHandlers.SavepointStatusHandler savepointStatusHandler;
+
+   private CompletedCheckpoint 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323698#comment-16323698
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r161163547
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java ---
@@ -27,11 +27,16 @@
 import java.util.concurrent.TimeoutException;
 
 /**
- * Utility functions for Flink's RPC implementation
+ * Utility functions for Flink's RPC implementation.
  */
 public class RpcUtils {
 
-   public static final Time INF_TIMEOUT = 
Time.milliseconds(Long.MAX_VALUE);
+   /**
+* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 
2.4.20). The value cannot be
+* higher or an {@link IllegalArgumentException} will be thrown during 
an RPC. Check the private
+* method {@code checkMaxDelay()} in {@link 
akka.actor.LightArrayRevolverScheduler}.
+*/
+   public static final Time INF_TIMEOUT = 
Time.milliseconds(TimeUnit.SECONDS.toMillis(21474835));
--- End diff --

done


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to 
> {{RestfulGateway}} interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}
> * Implement a new {{AbstractRestHandler}} which allows asynchronous 
> triggering of savepoints 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-09 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318563#comment-16318563
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r160431822
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java
 ---
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.savepoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import 
org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters;
+import 
org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody;
+import org.apache.flink.runtime.rest.messages.queue.QueueStatus;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for {@link SavepointHandlers}.
+ */
+public class SavepointHandlersTest {
+
+   private static final CompletableFuture LOCAL_REST_ADDRESS =
+   CompletableFuture.completedFuture("localhost:12345");
+
+   private static final Time TIMEOUT = Time.seconds(10);
+
+   private static final JobID JOB_ID = new JobID();
+
+   private static final String COMPLETED_CHECKPOINT_EXTERNAL_POINTER = 
"/tmp/savepoint-0d2fb9-8d5e0106041a";
+
+   private static final String 
DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY = "/tmp";
+
+   @Mock
+   private RestfulGateway mockRestfulGateway;
+
+   private SavepointHandlers savepointHandlers;
+
+   private SavepointHandlers.SavepointTriggerHandler 
savepointTriggerHandler;
+
+   private SavepointHandlers.SavepointStatusHandler savepointStatusHandler;
+
+   private CompletedCheckpoint 

[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308212#comment-16308212
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5223#discussion_r159247245
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java ---
@@ -27,11 +27,16 @@
 import java.util.concurrent.TimeoutException;
 
 /**
- * Utility functions for Flink's RPC implementation
+ * Utility functions for Flink's RPC implementation.
  */
 public class RpcUtils {
 
-   public static final Time INF_TIMEOUT = 
Time.milliseconds(Long.MAX_VALUE);
+   /**
+* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 
2.4.20). The value cannot be
+* higher or an {@link IllegalArgumentException} will be thrown during 
an RPC. Check the private
+* method {@code checkMaxDelay()} in {@link 
akka.actor.LightArrayRevolverScheduler}.
+*/
+   public static final Time INF_TIMEOUT = 
Time.milliseconds(TimeUnit.SECONDS.toMillis(21474835));
--- End diff --

`Time.seconds(21474835)`


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2018-01-01 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307518#comment-16307518
 ] 

ASF GitHub Bot commented on FLINK-8317:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5223

[FLINK-8317][flip6] Implement Triggering of Savepoints

## What is the purpose of the change

*Implement triggering of savepoints through HTTP and through command line 
in FLIP-6 mode. This PR is based on #5207.*

CC: @tillrohrmann 

## Brief change log

- *Allow triggering of savepoints through RestfulGateway.*
- *Implement REST handlers to trigger and query the status of savepoints.*
- *Implement savepoint command in RestClusterClient.*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for REST handlesr, and `RestClusterClient`*
  - *Manually deployed the `SocketWindowWordCount` job and triggered a 
savepoint using Flink's command line client and `curl`*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8317-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5223.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5223


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.

commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c
Author: gyao 
Date:   2017-12-22T23:02:10Z

[FLINK-8299][flip6] Retrieve JobExecutionResult after job submission

commit 55d920f628d7ef3f5b0db7fd843dfdd2d96a3917
Author: gyao 
Date:   2018-01-01T17:59:42Z

[FLINK-8317][flip6] Implement savepoints in RestClusterClient

Allow triggering of savepoints through RestfulGateway. Implement REST 
handlers
to trigger and query the status of savepoints. Implement
savepoint command in RestClusterClient.




> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)