[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16325161#comment-16325161 ] ASF GitHub Bot commented on FLINK-8317: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5223 > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16325064#comment-16325064 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161368455 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint, e.g., +
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324084#comment-16324084 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161243072 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint,
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324087#comment-16324087 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161243207 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -137,4 +138,21 @@ JobID jobId, @RpcTimeout Time timeout) { throw new UnsupportedOperationException(); } + + /** +* Triggers a savepoint with the given savepoint directory as a target. +* +* @param targetDirectory Target directory for the savepoint. +* @param timeout for the asynchronous operation +* @return A future to the completed checkpoint +* @throws IllegalStateException If no savepoint directory has been +* specified and no default savepoint directory has been +* configured +*/ + default CompletableFuture triggerSavepoint( --- End diff -- Cool :-) > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324083#comment-16324083 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242967 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint,
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324080#comment-16324080 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242316 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +/** + * Test for {@link SavepointHandlers}. + */ +public class SavepointHandlersTest { + + private static final CompletableFuture LOCAL_REST_ADDRESS = + CompletableFuture.completedFuture("localhost:12345"); + + private static final Time TIMEOUT = Time.seconds(10); + + private static final JobID JOB_ID = new JobID(); + + private static final String COMPLETED_CHECKPOINT_EXTERNAL_POINTER = "/tmp/savepoint-0d2fb9-8d5e0106041a"; + + private static final String DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY = "/tmp"; + + @Mock + private RestfulGateway mockRestfulGateway; + + private SavepointHandlers savepointHandlers; + + private SavepointHandlers.SavepointTriggerHandler savepointTriggerHandler; + + private SavepointHandlers.SavepointStatusHandler savepointStatusHandler; + + private CompletedCheckpoint
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324076#comment-16324076 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242236 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -294,44 +306,108 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception { @Test public void testTriggerSavepoint() throws Exception { - String targetSavepointDirectory = "/alternate"; - TestSavepointTriggerHandler triggerHandler = new TestSavepointTriggerHandler(targetSavepointDirectory); - try (TestRestServerEndpoint ignored = createRestServerEndpoint(triggerHandler)) { + final String targetSavepointDirectory = "/tmp"; + final TestSavepointHandlers testSavepointHandlers = new TestSavepointHandlers(); + final TestSavepointHandlers.TestSavepointTriggerHandler triggerHandler = + testSavepointHandlers.new TestSavepointTriggerHandler( + Arrays.asList(null, targetSavepointDirectory, null).iterator()); + final TestSavepointHandlers.TestSavepointHandler savepointHandler = + testSavepointHandlers.new TestSavepointHandler(Arrays.asList( + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + "/other/savepoint-0d2fb9-8d5e0106041a", + null)), + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + "/tmp/savepoint-0d2fb9-8d5e0106041a", + null)), + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + null, + new SerializedThrowable(new RuntimeException("expected").iterator()); + try (TestRestServerEndpoint ignored = createRestServerEndpoint( + triggerHandler, + savepointHandler)) { + JobID id = new JobID(); { CompletableFuture savepointPathFuture = restClusterClient.triggerSavepoint(id, null); String savepointPath = savepointPathFuture.get(); - Assert.assertEquals("/universe", savepointPath); + assertEquals("/other/savepoint-0d2fb9-8d5e0106041a", savepointPath); } { CompletableFuture savepointPathFuture = restClusterClient.triggerSavepoint(id, targetSavepointDirectory); String savepointPath = savepointPathFuture.get(); - Assert.assertEquals(targetSavepointDirectory + "/universe", savepointPath); + assertEquals("/tmp/savepoint-0d2fb9-8d5e0106041a", savepointPath); --- End diff -- done > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324079#comment-16324079 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242312 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfoTest.java --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.savepoints; + +import org.apache.flink.util.SerializedThrowable; + +import org.junit.Test; + +import static org.junit.Assert.fail; + +/** + * Tests for {@link SavepointInfo}. + */ +public class SavepointInfoTest { --- End diff -- fixed > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324081#comment-16324081 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242333 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +/** + * Test for {@link SavepointHandlers}. + */ +public class SavepointHandlersTest { --- End diff -- fixed > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324077#comment-16324077 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242245 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -294,44 +306,108 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception { @Test public void testTriggerSavepoint() throws Exception { - String targetSavepointDirectory = "/alternate"; - TestSavepointTriggerHandler triggerHandler = new TestSavepointTriggerHandler(targetSavepointDirectory); - try (TestRestServerEndpoint ignored = createRestServerEndpoint(triggerHandler)) { + final String targetSavepointDirectory = "/tmp"; + final TestSavepointHandlers testSavepointHandlers = new TestSavepointHandlers(); + final TestSavepointHandlers.TestSavepointTriggerHandler triggerHandler = + testSavepointHandlers.new TestSavepointTriggerHandler( + Arrays.asList(null, targetSavepointDirectory, null).iterator()); + final TestSavepointHandlers.TestSavepointHandler savepointHandler = + testSavepointHandlers.new TestSavepointHandler(Arrays.asList( + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + "/other/savepoint-0d2fb9-8d5e0106041a", + null)), + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + "/tmp/savepoint-0d2fb9-8d5e0106041a", + null)), + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + null, + new SerializedThrowable(new RuntimeException("expected").iterator()); + try (TestRestServerEndpoint ignored = createRestServerEndpoint( + triggerHandler, + savepointHandler)) { + JobID id = new JobID(); { CompletableFuture savepointPathFuture = restClusterClient.triggerSavepoint(id, null); String savepointPath = savepointPathFuture.get(); - Assert.assertEquals("/universe", savepointPath); + assertEquals("/other/savepoint-0d2fb9-8d5e0106041a", savepointPath); --- End diff -- done > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324078#comment-16324078 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161242267 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } - private JobResult waitForJobExecutionResult( - final JobID jobId) throws ProgramInvocationException { - - final JobMessageParameters messageParameters = new JobMessageParameters(); - messageParameters.jobPathParameter.resolve(jobId); - JobExecutionResultResponseBody jobExecutionResultResponseBody; - try { - long attempt = 0; - do { - final CompletableFuture responseFuture = - restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), - JobExecutionResultHeaders.getInstance(), - messageParameters); - jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - Thread.sleep(waitStrategy.sleepTime(attempt)); - attempt++; - } - while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); - } catch (IOException | TimeoutException | ExecutionException e) { - throw new ProgramInvocationException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProgramInvocationException(e); + privateR waitForResource( + final SupplierWithException resourceFutureSupplier) + throws IOException, InterruptedException, ExecutionException, TimeoutException { + T asynchronouslyCreatedResource; + long attempt = 0; + do { + final CompletableFuture responseFuture = resourceFutureSupplier.get(); + asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); --- End diff -- fixed > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324039#comment-16324039 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161238338 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint, e.g., +
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324032#comment-16324032 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161236268 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint, e.g., +
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324024#comment-16324024 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161235482 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -137,4 +138,21 @@ JobID jobId, @RpcTimeout Time timeout) { throw new UnsupportedOperationException(); } + + /** +* Triggers a savepoint with the given savepoint directory as a target. +* +* @param targetDirectory Target directory for the savepoint. +* @param timeout for the asynchronous operation +* @return A future to the completed checkpoint +* @throws IllegalStateException If no savepoint directory has been +* specified and no default savepoint directory has been +* configured +*/ + default CompletableFuture triggerSavepoint( --- End diff -- I can make it return the external pointer. > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324022#comment-16324022 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161234791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } - private JobResult waitForJobExecutionResult( - final JobID jobId) throws ProgramInvocationException { - - final JobMessageParameters messageParameters = new JobMessageParameters(); - messageParameters.jobPathParameter.resolve(jobId); - JobExecutionResultResponseBody jobExecutionResultResponseBody; - try { - long attempt = 0; - do { - final CompletableFuture responseFuture = - restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), - JobExecutionResultHeaders.getInstance(), - messageParameters); - jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - Thread.sleep(waitStrategy.sleepTime(attempt)); - attempt++; - } - while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); - } catch (IOException | TimeoutException | ExecutionException e) { - throw new ProgramInvocationException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProgramInvocationException(e); + privateR waitForResource( + final SupplierWithException resourceFutureSupplier) + throws IOException, InterruptedException, ExecutionException, TimeoutException { + T asynchronouslyCreatedResource; + long attempt = 0; + do { + final CompletableFuture responseFuture = resourceFutureSupplier.get(); + asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); --- End diff -- True > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324008#comment-16324008 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161228957 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +/** + * Test for {@link SavepointHandlers}. + */ +public class SavepointHandlersTest { --- End diff -- Missing `extends TestLogger` > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323999#comment-16323999 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161220157 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -294,44 +306,108 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception { @Test public void testTriggerSavepoint() throws Exception { - String targetSavepointDirectory = "/alternate"; - TestSavepointTriggerHandler triggerHandler = new TestSavepointTriggerHandler(targetSavepointDirectory); - try (TestRestServerEndpoint ignored = createRestServerEndpoint(triggerHandler)) { + final String targetSavepointDirectory = "/tmp"; + final TestSavepointHandlers testSavepointHandlers = new TestSavepointHandlers(); + final TestSavepointHandlers.TestSavepointTriggerHandler triggerHandler = + testSavepointHandlers.new TestSavepointTriggerHandler( + Arrays.asList(null, targetSavepointDirectory, null).iterator()); + final TestSavepointHandlers.TestSavepointHandler savepointHandler = + testSavepointHandlers.new TestSavepointHandler(Arrays.asList( + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + "/other/savepoint-0d2fb9-8d5e0106041a", + null)), + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + "/tmp/savepoint-0d2fb9-8d5e0106041a", + null)), + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + null, + new SerializedThrowable(new RuntimeException("expected").iterator()); + try (TestRestServerEndpoint ignored = createRestServerEndpoint( + triggerHandler, + savepointHandler)) { + JobID id = new JobID(); { CompletableFuture savepointPathFuture = restClusterClient.triggerSavepoint(id, null); String savepointPath = savepointPathFuture.get(); - Assert.assertEquals("/universe", savepointPath); + assertEquals("/other/savepoint-0d2fb9-8d5e0106041a", savepointPath); } { CompletableFuture savepointPathFuture = restClusterClient.triggerSavepoint(id, targetSavepointDirectory); String savepointPath = savepointPathFuture.get(); - Assert.assertEquals(targetSavepointDirectory + "/universe", savepointPath); + assertEquals("/tmp/savepoint-0d2fb9-8d5e0106041a", savepointPath); --- End diff -- Same here. > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324005#comment-16324005 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161220129 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java --- @@ -294,44 +306,108 @@ public void testSubmitJobAndWaitForExecutionResult() throws Exception { @Test public void testTriggerSavepoint() throws Exception { - String targetSavepointDirectory = "/alternate"; - TestSavepointTriggerHandler triggerHandler = new TestSavepointTriggerHandler(targetSavepointDirectory); - try (TestRestServerEndpoint ignored = createRestServerEndpoint(triggerHandler)) { + final String targetSavepointDirectory = "/tmp"; + final TestSavepointHandlers testSavepointHandlers = new TestSavepointHandlers(); + final TestSavepointHandlers.TestSavepointTriggerHandler triggerHandler = + testSavepointHandlers.new TestSavepointTriggerHandler( + Arrays.asList(null, targetSavepointDirectory, null).iterator()); + final TestSavepointHandlers.TestSavepointHandler savepointHandler = + testSavepointHandlers.new TestSavepointHandler(Arrays.asList( + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + "/other/savepoint-0d2fb9-8d5e0106041a", + null)), + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + "/tmp/savepoint-0d2fb9-8d5e0106041a", + null)), + new SavepointResponseBody(QueueStatus.completed(), new SavepointInfo( + testSavepointHandlers.testSavepointTriggerId, + null, + new SerializedThrowable(new RuntimeException("expected").iterator()); + try (TestRestServerEndpoint ignored = createRestServerEndpoint( + triggerHandler, + savepointHandler)) { + JobID id = new JobID(); { CompletableFuture savepointPathFuture = restClusterClient.triggerSavepoint(id, null); String savepointPath = savepointPathFuture.get(); - Assert.assertEquals("/universe", savepointPath); + assertEquals("/other/savepoint-0d2fb9-8d5e0106041a", savepointPath); --- End diff -- Maybe we could declare a variable with this value. Makes it easier to refactor later on. > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324002#comment-16324002 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161228758 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java --- @@ -27,11 +27,16 @@ import java.util.concurrent.TimeoutException; /** - * Utility functions for Flink's RPC implementation + * Utility functions for Flink's RPC implementation. */ public class RpcUtils { - public static final Time INF_TIMEOUT = Time.milliseconds(Long.MAX_VALUE); + /** +* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 2.4.20). The value cannot be +* higher or an {@link IllegalArgumentException} will be thrown during an RPC. Check the private +* method {@code checkMaxDelay()} in {@link akka.actor.LightArrayRevolverScheduler}. --- End diff -- Good catch :-) > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324003#comment-16324003 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161224329 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint,
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324004#comment-16324004 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161224559 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlers.java --- @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.NotFoundException; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointInfo; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerHeaders; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerResponseBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.types.Either; +import org.apache.flink.util.SerializedThrowable; + +import org.apache.flink.shaded.guava18.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.Immutable; +import javax.annotation.concurrent.ThreadSafe; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * HTTP handlers for asynchronous triggering of savepoints. + * + * Drawing savepoints is a potentially long-running operation. To avoid blocking HTTP + * connections, savepoints must be drawn in two steps. First, an HTTP request is issued to trigger + * the savepoint asynchronously. The request will be assigned a {@link SavepointTriggerId}, + * which is returned in the response body. Next, the returned id should be used to poll the status + * of the savepoint until it is finished. + * + * A savepoint is triggered by sending an HTTP {@code POST} request to + * {@code /jobs/:jobid/savepoints}. The HTTP request may contain a JSON body to specify the target + * directory of the savepoint,
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324007#comment-16324007 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161230839 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java --- @@ -137,4 +138,21 @@ JobID jobId, @RpcTimeout Time timeout) { throw new UnsupportedOperationException(); } + + /** +* Triggers a savepoint with the given savepoint directory as a target. +* +* @param targetDirectory Target directory for the savepoint. +* @param timeout for the asynchronous operation +* @return A future to the completed checkpoint +* @throws IllegalStateException If no savepoint directory has been +* specified and no default savepoint directory has been +* configured +*/ + default CompletableFuture triggerSavepoint( --- End diff -- I'm wondering whether we actually need to expose the `CompletedCheckpoint`. Wouldn't it be enough to return the external path here? That way we would avoid further coupling between external components and internal data structures. > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324001#comment-16324001 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161229580 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java --- @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +/** + * Test for {@link SavepointHandlers}. + */ +public class SavepointHandlersTest { + + private static final CompletableFuture LOCAL_REST_ADDRESS = + CompletableFuture.completedFuture("localhost:12345"); + + private static final Time TIMEOUT = Time.seconds(10); + + private static final JobID JOB_ID = new JobID(); + + private static final String COMPLETED_CHECKPOINT_EXTERNAL_POINTER = "/tmp/savepoint-0d2fb9-8d5e0106041a"; + + private static final String DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY = "/tmp"; + + @Mock + private RestfulGateway mockRestfulGateway; + + private SavepointHandlers savepointHandlers; + + private SavepointHandlers.SavepointTriggerHandler savepointTriggerHandler; + + private SavepointHandlers.SavepointStatusHandler savepointStatusHandler; + + private CompletedCheckpoint
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324006#comment-16324006 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161230366 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/savepoints/SavepointInfoTest.java --- @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages.job.savepoints; + +import org.apache.flink.util.SerializedThrowable; + +import org.junit.Test; + +import static org.junit.Assert.fail; + +/** + * Tests for {@link SavepointInfo}. + */ +public class SavepointInfoTest { --- End diff -- `extends TestLogger` > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16324000#comment-16324000 ] ASF GitHub Bot commented on FLINK-8317: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161218205 --- Diff: flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java --- @@ -185,37 +205,19 @@ private void submitJob(JobGraph jobGraph) throws JobSubmissionException { } } - private JobResult waitForJobExecutionResult( - final JobID jobId) throws ProgramInvocationException { - - final JobMessageParameters messageParameters = new JobMessageParameters(); - messageParameters.jobPathParameter.resolve(jobId); - JobExecutionResultResponseBody jobExecutionResultResponseBody; - try { - long attempt = 0; - do { - final CompletableFuture responseFuture = - restClient.sendRequest( - restClusterClientConfiguration.getRestServerAddress(), - restClusterClientConfiguration.getRestServerPort(), - JobExecutionResultHeaders.getInstance(), - messageParameters); - jobExecutionResultResponseBody = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); - Thread.sleep(waitStrategy.sleepTime(attempt)); - attempt++; - } - while (jobExecutionResultResponseBody.getStatus().getStatusId() != QueueStatus.StatusId.COMPLETED); - } catch (IOException | TimeoutException | ExecutionException e) { - throw new ProgramInvocationException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new ProgramInvocationException(e); + privateR waitForResource( + final SupplierWithException resourceFutureSupplier) + throws IOException, InterruptedException, ExecutionException, TimeoutException { + T asynchronouslyCreatedResource; + long attempt = 0; + do { + final CompletableFuture responseFuture = resourceFutureSupplier.get(); + asynchronouslyCreatedResource = responseFuture.get(timeout.toMillis(), TimeUnit.MILLISECONDS); + Thread.sleep(waitStrategy.sleepTime(attempt)); --- End diff -- I think that we don't have to wait if `asynchronouslyCreatedResource.queueStatus().getId() == QueueStatus.Id.COMPLETED` > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323700#comment-16323700 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161163563 --- Diff: flink-clients/src/test/java/org/apache/flink/client/program/rest/retry/ExponentialWaitStrategyTest.java --- @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.client.program.rest.retry; + +import org.junit.Test; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + +/** + * Tests for {@link ExponentialWaitStrategy}. + */ +public class ExponentialWaitStrategyTest { --- End diff -- done > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323699#comment-16323699 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161163558 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +/** + * Test for {@link SavepointHandlers}. + */ +public class SavepointHandlersTest { + + private static final CompletableFuture LOCAL_REST_ADDRESS = + CompletableFuture.completedFuture("localhost:12345"); + + private static final Time TIMEOUT = Time.seconds(10); + + private static final JobID JOB_ID = new JobID(); + + private static final String COMPLETED_CHECKPOINT_EXTERNAL_POINTER = "/tmp/savepoint-0d2fb9-8d5e0106041a"; + + private static final String DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY = "/tmp"; + + @Mock + private RestfulGateway mockRestfulGateway; + + private SavepointHandlers savepointHandlers; + + private SavepointHandlers.SavepointTriggerHandler savepointTriggerHandler; + + private SavepointHandlers.SavepointStatusHandler savepointStatusHandler; + + private CompletedCheckpoint
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16323698#comment-16323698 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r161163547 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java --- @@ -27,11 +27,16 @@ import java.util.concurrent.TimeoutException; /** - * Utility functions for Flink's RPC implementation + * Utility functions for Flink's RPC implementation. */ public class RpcUtils { - public static final Time INF_TIMEOUT = Time.milliseconds(Long.MAX_VALUE); + /** +* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 2.4.20). The value cannot be +* higher or an {@link IllegalArgumentException} will be thrown during an RPC. Check the private +* method {@code checkMaxDelay()} in {@link akka.actor.LightArrayRevolverScheduler}. +*/ + public static final Time INF_TIMEOUT = Time.milliseconds(TimeUnit.SECONDS.toMillis(21474835)); --- End diff -- done > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to > {{RestfulGateway}} interface > * Implement method in {{Dispatcher}} and {{JobMaster}} > * Implement a new {{AbstractRestHandler}} which allows asynchronous > triggering of savepoints -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16318563#comment-16318563 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r160431822 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/savepoints/SavepointHandlersTest.java --- @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.savepoints; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.checkpoint.CheckpointProperties; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.SavepointTriggerIdPathParameter; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointResponseBody; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointStatusMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerId; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerMessageParameters; +import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointTriggerRequestBody; +import org.apache.flink.runtime.rest.messages.queue.QueueStatus; +import org.apache.flink.runtime.state.filesystem.FileStateHandle; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.when; + +/** + * Test for {@link SavepointHandlers}. + */ +public class SavepointHandlersTest { + + private static final CompletableFuture LOCAL_REST_ADDRESS = + CompletableFuture.completedFuture("localhost:12345"); + + private static final Time TIMEOUT = Time.seconds(10); + + private static final JobID JOB_ID = new JobID(); + + private static final String COMPLETED_CHECKPOINT_EXTERNAL_POINTER = "/tmp/savepoint-0d2fb9-8d5e0106041a"; + + private static final String DEFAULT_REQUESTED_SAVEPOINT_TARGET_DIRECTORY = "/tmp"; + + @Mock + private RestfulGateway mockRestfulGateway; + + private SavepointHandlers savepointHandlers; + + private SavepointHandlers.SavepointTriggerHandler savepointTriggerHandler; + + private SavepointHandlers.SavepointStatusHandler savepointStatusHandler; + + private CompletedCheckpoint
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16308212#comment-16308212 ] ASF GitHub Bot commented on FLINK-8317: --- Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5223#discussion_r159247245 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcUtils.java --- @@ -27,11 +27,16 @@ import java.util.concurrent.TimeoutException; /** - * Utility functions for Flink's RPC implementation + * Utility functions for Flink's RPC implementation. */ public class RpcUtils { - public static final Time INF_TIMEOUT = Time.milliseconds(Long.MAX_VALUE); + /** +* HACK: Set to 21474835 seconds, Akka's maximum delay (Akka 2.4.20). The value cannot be +* higher or an {@link IllegalArgumentException} will be thrown during an RPC. Check the private +* method {@code checkMaxDelay()} in {@link akka.actor.LightArrayRevolverScheduler}. +*/ + public static final Time INF_TIMEOUT = Time.milliseconds(TimeUnit.SECONDS.toMillis(21474835)); --- End diff -- `Time.seconds(21474835)` > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to interface > * Implement method in {{Dispatcher}} and {{JobMaster}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16307518#comment-16307518 ] ASF GitHub Bot commented on FLINK-8317: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5223 [FLINK-8317][flip6] Implement Triggering of Savepoints ## What is the purpose of the change *Implement triggering of savepoints through HTTP and through command line in FLIP-6 mode. This PR is based on #5207.* CC: @tillrohrmann ## Brief change log - *Allow triggering of savepoints through RestfulGateway.* - *Implement REST handlers to trigger and query the status of savepoints.* - *Implement savepoint command in RestClusterClient.* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for REST handlesr, and `RestClusterClient`* - *Manually deployed the `SocketWindowWordCount` job and triggered a savepoint using Flink's command line client and `curl`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8317-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5223.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5223 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyaoDate: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c Author: gyao Date: 2017-12-22T23:02:10Z [FLINK-8299][flip6] Retrieve JobExecutionResult after job submission commit 55d920f628d7ef3f5b0db7fd843dfdd2d96a3917 Author: gyao Date: 2018-01-01T17:59:42Z [FLINK-8317][flip6] Implement savepoints in RestClusterClient Allow triggering of savepoints through RestfulGateway. Implement REST handlers to trigger and query the status of savepoints. Implement savepoint command in RestClusterClient. > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to interface > * Implement method in {{Dispatcher}} and {{JobMaster}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)