[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5701 ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175755400 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java --- @@ -194,49 +187,35 @@ protected final void restoreAndExecute( String savepointPath, Tuple2... expectedAccumulators) throws Exception { - // Retrieve the job manager - Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient client = miniClusterResource.getClusterClient(); + client.setDetached(true); // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); - JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); - - StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); - JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID()); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader()); boolean done = false; while (DEADLINE.hasTimeLeft()) { // try and get a job result, this will fail if the job already failed. Use this // to get out of this loop JobID jobId = jobSubmissionResult.getJobID(); - FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS); try { + CompletableFuture jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID()); - Future future = clusterClient - .getJobManagerGateway() - .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout); - - Object result = Await.result(future, timeout); + JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS); - if (result instanceof JobManagerMessages.CurrentJobStatus) { - if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) { - Object jobResult = Await.result( - jobListeningContext.getJobResultFuture(), - Duration.apply(5, TimeUnit.SECONDS)); - fail("Job failed: " + jobResult); - } - } + assertNotEquals(JobStatus.FAILED, jobStatus); } catch (Exception e) { fail("Could not connect to job: " + e); } Thread.sleep(100); --- End diff -- True, you're right. Sorry I didn't look closely enough at the test case. Forget my comments here :-) ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175751892 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java --- @@ -194,49 +187,35 @@ protected final void restoreAndExecute( String savepointPath, Tuple2... expectedAccumulators) throws Exception { - // Retrieve the job manager - Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient client = miniClusterResource.getClusterClient(); + client.setDetached(true); // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); - JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); - - StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); - JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID()); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader()); boolean done = false; while (DEADLINE.hasTimeLeft()) { // try and get a job result, this will fail if the job already failed. Use this // to get out of this loop JobID jobId = jobSubmissionResult.getJobID(); - FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS); try { + CompletableFuture jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID()); - Future future = clusterClient - .getJobManagerGateway() - .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout); - - Object result = Await.result(future, timeout); + JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS); - if (result instanceof JobManagerMessages.CurrentJobStatus) { - if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) { - Object jobResult = Await.result( - jobListeningContext.getJobResultFuture(), - Duration.apply(5, TimeUnit.SECONDS)); - fail("Job failed: " + jobResult); - } - } + assertNotEquals(JobStatus.FAILED, jobStatus); } catch (Exception e) { fail("Could not connect to job: " + e); } Thread.sleep(100); --- End diff -- If the job reaches `JobStatus.FAILED` the test fails. `assertNotEquals(JobStatus.FAILED, jobStatus);` We're polling the accumulators in a loop so long as the job is not failed and the deadline wasn't reached yet. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175747950 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java --- @@ -194,49 +187,35 @@ protected final void restoreAndExecute( String savepointPath, Tuple2... expectedAccumulators) throws Exception { - // Retrieve the job manager - Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient client = miniClusterResource.getClusterClient(); + client.setDetached(true); // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); - JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); - - StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); - JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID()); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader()); boolean done = false; while (DEADLINE.hasTimeLeft()) { // try and get a job result, this will fail if the job already failed. Use this // to get out of this loop JobID jobId = jobSubmissionResult.getJobID(); - FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS); try { + CompletableFuture jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID()); - Future future = clusterClient - .getJobManagerGateway() - .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout); - - Object result = Await.result(future, timeout); + JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS); - if (result instanceof JobManagerMessages.CurrentJobStatus) { - if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) { - Object jobResult = Await.result( - jobListeningContext.getJobResultFuture(), - Duration.apply(5, TimeUnit.SECONDS)); - fail("Job failed: " + jobResult); - } - } + assertNotEquals(JobStatus.FAILED, jobStatus); } catch (Exception e) { fail("Could not connect to job: " + e); } Thread.sleep(100); --- End diff -- How so? Don't we call it once after the job has reached the `JobStatus.FAILED` test? Maybe it is actually not needed and can be removed. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175747586 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * A report about the current values of all accumulators of the TaskExecutor for a given job. + */ +public class AccumulatorReport implements Serializable, Iterable { --- End diff -- I think `SlotReport` is wrongly implementing this interface. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175735031 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java --- @@ -194,49 +187,35 @@ protected final void restoreAndExecute( String savepointPath, Tuple2... expectedAccumulators) throws Exception { - // Retrieve the job manager - Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient client = miniClusterResource.getClusterClient(); + client.setDetached(true); // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); - JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); - - StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); - JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID()); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader()); boolean done = false; while (DEADLINE.hasTimeLeft()) { // try and get a job result, this will fail if the job already failed. Use this // to get out of this loop JobID jobId = jobSubmissionResult.getJobID(); - FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS); try { + CompletableFuture jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID()); - Future future = clusterClient - .getJobManagerGateway() - .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout); - - Object result = Await.result(future, timeout); + JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS); - if (result instanceof JobManagerMessages.CurrentJobStatus) { - if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) { - Object jobResult = Await.result( - jobListeningContext.getJobResultFuture(), - Duration.apply(5, TimeUnit.SECONDS)); - fail("Job failed: " + jobResult); - } - } + assertNotEquals(JobStatus.FAILED, jobStatus); } catch (Exception e) { fail("Could not connect to job: " + e); } Thread.sleep(100); --- End diff -- probably meant to prevent the test from spamming the cluster with accumulator requests. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175733781 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * A report about the current values of all accumulators of the TaskExecutor for a given job. + */ +public class AccumulatorReport implements Serializable, Iterable { --- End diff -- I can change it. I followed the design of `SlotReport` which also implements iterable even though it just holds a collection. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175733627 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * A report about the current values of all accumulators of the TaskExecutor for a given job. + */ +public class AccumulatorReport implements Serializable, Iterable { + private final List accumulatorSnapshots; --- End diff -- sure ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175731028 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java --- @@ -87,17 +89,24 @@ protected static String getResourceFilename(String filename) { return resource.getFile(); } - @Before - public void setup() throws Exception { + private Configuration getConfigurationSafe() { --- End diff -- I think we can get rid of this method by moving the `miniClusterResource` initialization to the constructor which says that it can throw an `Exception`. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175731826 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java --- @@ -194,49 +187,35 @@ protected final void restoreAndExecute( String savepointPath, Tuple2... expectedAccumulators) throws Exception { - // Retrieve the job manager - Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient client = miniClusterResource.getClusterClient(); + client.setDetached(true); --- End diff -- As a side note which is out of scope for this issue. I think we should `deprecate` `ClusterClient#setDetached`. It should not be an attribute of the client but more of how you submit a job. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175731427 --- Diff: flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java --- @@ -194,49 +187,35 @@ protected final void restoreAndExecute( String savepointPath, Tuple2... expectedAccumulators) throws Exception { - // Retrieve the job manager - Await.result(cluster.leaderGateway().future(), DEADLINE.timeLeft()); + ClusterClient client = miniClusterResource.getClusterClient(); + client.setDetached(true); // Submit the job JobGraph jobGraph = env.getStreamGraph().getJobGraph(); jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath)); - JobSubmissionResult jobSubmissionResult = cluster.submitJobDetached(jobGraph); - - StandaloneClusterClient clusterClient = new StandaloneClusterClient(cluster.configuration()); - JobListeningContext jobListeningContext = clusterClient.connectToJob(jobSubmissionResult.getJobID()); + JobSubmissionResult jobSubmissionResult = client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader()); boolean done = false; while (DEADLINE.hasTimeLeft()) { // try and get a job result, this will fail if the job already failed. Use this // to get out of this loop JobID jobId = jobSubmissionResult.getJobID(); - FiniteDuration timeout = FiniteDuration.apply(5, TimeUnit.SECONDS); try { + CompletableFuture jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID()); - Future future = clusterClient - .getJobManagerGateway() - .ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), timeout); - - Object result = Await.result(future, timeout); + JobStatus jobStatus = jobStatusFuture.get(5, TimeUnit.SECONDS); - if (result instanceof JobManagerMessages.CurrentJobStatus) { - if (((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) { - Object jobResult = Await.result( - jobListeningContext.getJobResultFuture(), - Duration.apply(5, TimeUnit.SECONDS)); - fail("Job failed: " + jobResult); - } - } + assertNotEquals(JobStatus.FAILED, jobStatus); } catch (Exception e) { fail("Could not connect to job: " + e); } Thread.sleep(100); --- End diff -- For what do we need this sleep? ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175730158 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * A report about the current values of all accumulators of the TaskExecutor for a given job. + */ +public class AccumulatorReport implements Serializable, Iterable { --- End diff -- I think it would be better to not implement `Iterable`. `AccumulatorReport` is a value class which contains a `Collection` of `AccumulatorSnapshot`. Thus, it should be as simple as this. By letting this class implement the `Iterable` interface it basically means that `AccumulatorReport` can be used in places where an `Iterable` is usable. I think this should not be the case for this class. My concern is that we only let it implement this interface to cut a corner when iterating over the list of `AccumulatorSnapshots`. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175729168 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java --- @@ -1515,8 +1516,22 @@ public void reportPayload(ResourceID resourceID, Void payload) { } @Override - public CompletableFuture retrievePayload(ResourceID resourceID) { - return CompletableFuture.completedFuture(null); + public CompletableFuture retrievePayload(ResourceID resourceID) { --- End diff -- Let's add `validateRunsInMainThread` as a first statement. That way we enforce that this method really runs in the main thread context. ---
[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5701#discussion_r175727728 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java --- @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; + +import java.io.Serializable; +import java.util.Iterator; +import java.util.List; + +/** + * A report about the current values of all accumulators of the TaskExecutor for a given job. + */ +public class AccumulatorReport implements Serializable, Iterable { + private final List accumulatorSnapshots; --- End diff -- This could also be a `Collection` right? ---