[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5701


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175755400
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 ---
@@ -194,49 +187,35 @@ protected final void restoreAndExecute(
String savepointPath,
Tuple2... expectedAccumulators) throws 
Exception {
 
-   // Retrieve the job manager
-   Await.result(cluster.leaderGateway().future(), 
DEADLINE.timeLeft());
+   ClusterClient client = 
miniClusterResource.getClusterClient();
+   client.setDetached(true);
 
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 

jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-   JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
-
-   StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
-   JobListeningContext jobListeningContext = 
clusterClient.connectToJob(jobSubmissionResult.getJobID());
+   JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
 
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
 
// try and get a job result, this will fail if the job 
already failed. Use this
// to get out of this loop
JobID jobId = jobSubmissionResult.getJobID();
-   FiniteDuration timeout = FiniteDuration.apply(5, 
TimeUnit.SECONDS);
 
try {
+   CompletableFuture jobStatusFuture = 
client.getJobStatus(jobSubmissionResult.getJobID());
 
-   Future future = clusterClient
-   .getJobManagerGateway()
-   
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), 
timeout);
-
-   Object result = Await.result(future, timeout);
+   JobStatus jobStatus = jobStatusFuture.get(5, 
TimeUnit.SECONDS);
 
-   if (result instanceof 
JobManagerMessages.CurrentJobStatus) {
-   if 
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
-   Object jobResult = Await.result(
-   
jobListeningContext.getJobResultFuture(),
-   
Duration.apply(5, TimeUnit.SECONDS));
-   fail("Job failed: " + 
jobResult);
-   }
-   }
+   assertNotEquals(JobStatus.FAILED, jobStatus);
} catch (Exception e) {
fail("Could not connect to job: " + e);
}
 
Thread.sleep(100);
--- End diff --

True, you're right. Sorry I didn't look closely enough at the test case. 
Forget my comments here :-)


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175751892
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 ---
@@ -194,49 +187,35 @@ protected final void restoreAndExecute(
String savepointPath,
Tuple2... expectedAccumulators) throws 
Exception {
 
-   // Retrieve the job manager
-   Await.result(cluster.leaderGateway().future(), 
DEADLINE.timeLeft());
+   ClusterClient client = 
miniClusterResource.getClusterClient();
+   client.setDetached(true);
 
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 

jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-   JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
-
-   StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
-   JobListeningContext jobListeningContext = 
clusterClient.connectToJob(jobSubmissionResult.getJobID());
+   JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
 
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
 
// try and get a job result, this will fail if the job 
already failed. Use this
// to get out of this loop
JobID jobId = jobSubmissionResult.getJobID();
-   FiniteDuration timeout = FiniteDuration.apply(5, 
TimeUnit.SECONDS);
 
try {
+   CompletableFuture jobStatusFuture = 
client.getJobStatus(jobSubmissionResult.getJobID());
 
-   Future future = clusterClient
-   .getJobManagerGateway()
-   
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), 
timeout);
-
-   Object result = Await.result(future, timeout);
+   JobStatus jobStatus = jobStatusFuture.get(5, 
TimeUnit.SECONDS);
 
-   if (result instanceof 
JobManagerMessages.CurrentJobStatus) {
-   if 
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
-   Object jobResult = Await.result(
-   
jobListeningContext.getJobResultFuture(),
-   
Duration.apply(5, TimeUnit.SECONDS));
-   fail("Job failed: " + 
jobResult);
-   }
-   }
+   assertNotEquals(JobStatus.FAILED, jobStatus);
} catch (Exception e) {
fail("Could not connect to job: " + e);
}
 
Thread.sleep(100);
--- End diff --

If the job reaches `JobStatus.FAILED` the test fails. 
`assertNotEquals(JobStatus.FAILED, jobStatus);`

We're polling the accumulators in a loop so long as the job is not failed 
and the deadline wasn't reached yet.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175747950
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 ---
@@ -194,49 +187,35 @@ protected final void restoreAndExecute(
String savepointPath,
Tuple2... expectedAccumulators) throws 
Exception {
 
-   // Retrieve the job manager
-   Await.result(cluster.leaderGateway().future(), 
DEADLINE.timeLeft());
+   ClusterClient client = 
miniClusterResource.getClusterClient();
+   client.setDetached(true);
 
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 

jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-   JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
-
-   StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
-   JobListeningContext jobListeningContext = 
clusterClient.connectToJob(jobSubmissionResult.getJobID());
+   JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
 
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
 
// try and get a job result, this will fail if the job 
already failed. Use this
// to get out of this loop
JobID jobId = jobSubmissionResult.getJobID();
-   FiniteDuration timeout = FiniteDuration.apply(5, 
TimeUnit.SECONDS);
 
try {
+   CompletableFuture jobStatusFuture = 
client.getJobStatus(jobSubmissionResult.getJobID());
 
-   Future future = clusterClient
-   .getJobManagerGateway()
-   
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), 
timeout);
-
-   Object result = Await.result(future, timeout);
+   JobStatus jobStatus = jobStatusFuture.get(5, 
TimeUnit.SECONDS);
 
-   if (result instanceof 
JobManagerMessages.CurrentJobStatus) {
-   if 
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
-   Object jobResult = Await.result(
-   
jobListeningContext.getJobResultFuture(),
-   
Duration.apply(5, TimeUnit.SECONDS));
-   fail("Job failed: " + 
jobResult);
-   }
-   }
+   assertNotEquals(JobStatus.FAILED, jobStatus);
} catch (Exception e) {
fail("Could not connect to job: " + e);
}
 
Thread.sleep(100);
--- End diff --

How so? Don't we call it once after the job has reached the 
`JobStatus.FAILED` test? Maybe it is actually not needed and can be removed.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175747586
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A report about the current values of all accumulators of the 
TaskExecutor for a given job.
+ */
+public class AccumulatorReport implements Serializable, 
Iterable {
--- End diff --

I think `SlotReport` is wrongly implementing this interface.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175735031
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 ---
@@ -194,49 +187,35 @@ protected final void restoreAndExecute(
String savepointPath,
Tuple2... expectedAccumulators) throws 
Exception {
 
-   // Retrieve the job manager
-   Await.result(cluster.leaderGateway().future(), 
DEADLINE.timeLeft());
+   ClusterClient client = 
miniClusterResource.getClusterClient();
+   client.setDetached(true);
 
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 

jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-   JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
-
-   StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
-   JobListeningContext jobListeningContext = 
clusterClient.connectToJob(jobSubmissionResult.getJobID());
+   JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
 
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
 
// try and get a job result, this will fail if the job 
already failed. Use this
// to get out of this loop
JobID jobId = jobSubmissionResult.getJobID();
-   FiniteDuration timeout = FiniteDuration.apply(5, 
TimeUnit.SECONDS);
 
try {
+   CompletableFuture jobStatusFuture = 
client.getJobStatus(jobSubmissionResult.getJobID());
 
-   Future future = clusterClient
-   .getJobManagerGateway()
-   
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), 
timeout);
-
-   Object result = Await.result(future, timeout);
+   JobStatus jobStatus = jobStatusFuture.get(5, 
TimeUnit.SECONDS);
 
-   if (result instanceof 
JobManagerMessages.CurrentJobStatus) {
-   if 
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
-   Object jobResult = Await.result(
-   
jobListeningContext.getJobResultFuture(),
-   
Duration.apply(5, TimeUnit.SECONDS));
-   fail("Job failed: " + 
jobResult);
-   }
-   }
+   assertNotEquals(JobStatus.FAILED, jobStatus);
} catch (Exception e) {
fail("Could not connect to job: " + e);
}
 
Thread.sleep(100);
--- End diff --

probably meant to prevent the test from spamming the cluster with 
accumulator requests.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175733781
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A report about the current values of all accumulators of the 
TaskExecutor for a given job.
+ */
+public class AccumulatorReport implements Serializable, 
Iterable {
--- End diff --

I can change it. I followed the design of `SlotReport` which also 
implements iterable even though it just holds a collection.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175733627
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A report about the current values of all accumulators of the 
TaskExecutor for a given job.
+ */
+public class AccumulatorReport implements Serializable, 
Iterable {
+   private final List accumulatorSnapshots;
--- End diff --

sure


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175731028
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 ---
@@ -87,17 +89,24 @@ protected static String getResourceFilename(String 
filename) {
return resource.getFile();
}
 
-   @Before
-   public void setup() throws Exception {
+   private Configuration getConfigurationSafe() {
--- End diff --

I think we can get rid of this method by moving the `miniClusterResource` 
initialization to the constructor which says that it can throw an `Exception`.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175731826
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 ---
@@ -194,49 +187,35 @@ protected final void restoreAndExecute(
String savepointPath,
Tuple2... expectedAccumulators) throws 
Exception {
 
-   // Retrieve the job manager
-   Await.result(cluster.leaderGateway().future(), 
DEADLINE.timeLeft());
+   ClusterClient client = 
miniClusterResource.getClusterClient();
+   client.setDetached(true);
--- End diff --

As a side note which is out of scope for this issue. I think we should 
`deprecate` `ClusterClient#setDetached`. It should not be an attribute of the 
client but more of how you submit a job.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175731427
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/checkpointing/utils/SavepointMigrationTestBase.java
 ---
@@ -194,49 +187,35 @@ protected final void restoreAndExecute(
String savepointPath,
Tuple2... expectedAccumulators) throws 
Exception {
 
-   // Retrieve the job manager
-   Await.result(cluster.leaderGateway().future(), 
DEADLINE.timeLeft());
+   ClusterClient client = 
miniClusterResource.getClusterClient();
+   client.setDetached(true);
 
// Submit the job
JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 

jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-   JobSubmissionResult jobSubmissionResult = 
cluster.submitJobDetached(jobGraph);
-
-   StandaloneClusterClient clusterClient = new 
StandaloneClusterClient(cluster.configuration());
-   JobListeningContext jobListeningContext = 
clusterClient.connectToJob(jobSubmissionResult.getJobID());
+   JobSubmissionResult jobSubmissionResult = 
client.submitJob(jobGraph, SavepointMigrationTestBase.class.getClassLoader());
 
boolean done = false;
while (DEADLINE.hasTimeLeft()) {
 
// try and get a job result, this will fail if the job 
already failed. Use this
// to get out of this loop
JobID jobId = jobSubmissionResult.getJobID();
-   FiniteDuration timeout = FiniteDuration.apply(5, 
TimeUnit.SECONDS);
 
try {
+   CompletableFuture jobStatusFuture = 
client.getJobStatus(jobSubmissionResult.getJobID());
 
-   Future future = clusterClient
-   .getJobManagerGateway()
-   
.ask(JobManagerMessages.getRequestJobStatus(jobSubmissionResult.getJobID()), 
timeout);
-
-   Object result = Await.result(future, timeout);
+   JobStatus jobStatus = jobStatusFuture.get(5, 
TimeUnit.SECONDS);
 
-   if (result instanceof 
JobManagerMessages.CurrentJobStatus) {
-   if 
(((JobManagerMessages.CurrentJobStatus) result).status() == JobStatus.FAILED) {
-   Object jobResult = Await.result(
-   
jobListeningContext.getJobResultFuture(),
-   
Duration.apply(5, TimeUnit.SECONDS));
-   fail("Job failed: " + 
jobResult);
-   }
-   }
+   assertNotEquals(JobStatus.FAILED, jobStatus);
} catch (Exception e) {
fail("Could not connect to job: " + e);
}
 
Thread.sleep(100);
--- End diff --

For what do we need this sleep?


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175730158
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A report about the current values of all accumulators of the 
TaskExecutor for a given job.
+ */
+public class AccumulatorReport implements Serializable, 
Iterable {
--- End diff --

I think it would be better to not implement `Iterable`. `AccumulatorReport` 
is a value class which contains a `Collection` of `AccumulatorSnapshot`. Thus, 
it should be as simple as this. By letting this class implement the `Iterable` 
interface it basically means that `AccumulatorReport` can be used in places 
where an `Iterable` is usable. I think this should not be the case for this 
class. My concern is that we only let it implement this interface to cut a 
corner when iterating over the list of `AccumulatorSnapshots`.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175729168
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 ---
@@ -1515,8 +1516,22 @@ public void reportPayload(ResourceID resourceID, 
Void payload) {
}
 
@Override
-   public CompletableFuture retrievePayload(ResourceID 
resourceID) {
-   return CompletableFuture.completedFuture(null);
+   public CompletableFuture 
retrievePayload(ResourceID resourceID) {
--- End diff --

Let's add `validateRunsInMainThread` as a first statement. That way we 
enforce that this method really runs in the main thread context.


---


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5701#discussion_r175727728
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/AccumulatorReport.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
+
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * A report about the current values of all accumulators of the 
TaskExecutor for a given job.
+ */
+public class AccumulatorReport implements Serializable, 
Iterable {
+   private final List accumulatorSnapshots;
--- End diff --

This could also be a `Collection` right?


---