[FLINK-2815] [REFACTOR] Remove Pact from class and file names since it is no 
longer valid reference

Remove Pact word from class and file names in Apache Flink.
Pact was the name used in Stratosphere time to refer to concept of distributed 
datasets (similar to Flink Dataset). It was used when Pact and Nephele still 
separate concept.

As part of 0.10.0 release cleanup effort, let's remove the Pact names to avoid 
confusion.

The PR also contains small cleanups (sorry):
1. Small refactor DataSinkTask and DataSourceTask to follow Java7 generic 
convention creation new collection. Remove LOG.isDebugEnabled check.
2. Simple cleanup to update MapValue and TypeInformation with Java7 generic 
convention creation new collection.
3. Combine several exceptions that have same catch operation.

Apologize for the extra changes with PR. But I separated them into different 
commits for easier review.

Author: hsaputra <hsapu...@apache.org>

Closes #1218 from hsaputra/remove_pact_name and squashes the following commits:

b3c55b4 [hsaputra] Rename RegularTask to BatchTask per review.
e278fac [hsaputra] Address review comments from chiwanpark (good catch).
9f92f33 [hsaputra] Remove Pact from the file names of teh flink-runtime and 
flink-clients modules.
dbb2175 [hsaputra] Simple cleanup to update MapValue with Java7 generic for new 
collection. Remove unused imports in CollectionsDataTypeTest.
df2f553 [hsaputra] Use Java7 style of type resolution for new collection.
6403d44 [hsaputra] Remove the word Pact from the Javadoc for ChainedDriver.
0c562f4 [hsaputra] Small refactor on DataSinkTask and DataSourceTask classes to 
keep up with modern Java practice.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b08669ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b08669ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b08669ab

Branch: refs/heads/master
Commit: b08669abf282c52b54c395b85e992edb8ca621d4
Parents: e494c27
Author: hsaputra <hsapu...@apache.org>
Authored: Tue Oct 6 08:14:06 2015 -0700
Committer: hsaputra <hsapu...@apache.org>
Committed: Tue Oct 6 08:14:06 2015 -0700

----------------------------------------------------------------------
 .../apache/flink/client/web/JobJSONServlet.java |  163 ++
 .../flink/client/web/PactJobJSONServlet.java    |  163 --
 .../flink/client/web/WebInterfaceServer.java    |    2 +-
 .../api/common/typeinfo/TypeInformation.java    |    2 +-
 .../java/org/apache/flink/types/MapValue.java   |   18 +-
 .../flink/types/CollectionsDataTypeTest.java    |    6 -
 .../plantranslate/JobGraphGenerator.java        |   30 +-
 .../src/test/resources/logback-test.xml         |    2 +-
 .../broadcast/BroadcastVariableManager.java     |   10 +-
 .../BroadcastVariableMaterialization.java       |   12 +-
 .../concurrent/SolutionSetUpdateBarrier.java    |    7 +-
 .../SolutionSetUpdateBarrierBroker.java         |    7 +-
 .../task/AbstractIterativePactTask.java         |  395 -----
 .../iterative/task/AbstractIterativeTask.java   |  395 +++++
 .../iterative/task/IterationHeadPactTask.java   |  440 -----
 .../iterative/task/IterationHeadTask.java       |  441 ++++++
 .../task/IterationIntermediatePactTask.java     |  131 --
 .../task/IterationIntermediateTask.java         |  131 ++
 .../task/IterationSynchronizationSinkTask.java  |    4 +-
 .../iterative/task/IterationTailPactTask.java   |  140 --
 .../iterative/task/IterationTailTask.java       |  140 ++
 .../jobgraph/tasks/AbstractInvokable.java       |    3 +-
 .../AbstractCachedBuildSideJoinDriver.java      |    2 +-
 .../operators/AbstractOuterJoinDriver.java      |    6 +-
 .../operators/AllGroupCombineDriver.java        |    6 +-
 .../runtime/operators/AllGroupReduceDriver.java |    6 +-
 .../runtime/operators/AllReduceDriver.java      |    6 +-
 .../flink/runtime/operators/BatchTask.java      | 1499 ++++++++++++++++++
 .../flink/runtime/operators/CoGroupDriver.java  |    6 +-
 .../runtime/operators/CoGroupRawDriver.java     |    6 +-
 .../CoGroupWithSolutionSetFirstDriver.java      |   12 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |   12 +-
 .../runtime/operators/CollectorMapDriver.java   |    6 +-
 .../flink/runtime/operators/CrossDriver.java    |    6 +-
 .../flink/runtime/operators/DataSinkTask.java   |   41 +-
 .../flink/runtime/operators/DataSourceTask.java |   58 +-
 .../apache/flink/runtime/operators/Driver.java  |   90 ++
 .../flink/runtime/operators/DriverStrategy.java |   12 +-
 .../flink/runtime/operators/FlatMapDriver.java  |    6 +-
 .../operators/GroupReduceCombineDriver.java     |    6 +-
 .../runtime/operators/GroupReduceDriver.java    |    6 +-
 .../flink/runtime/operators/JoinDriver.java     |    6 +-
 .../JoinWithSolutionSetFirstDriver.java         |   12 +-
 .../JoinWithSolutionSetSecondDriver.java        |   12 +-
 .../flink/runtime/operators/MapDriver.java      |    6 +-
 .../runtime/operators/MapPartitionDriver.java   |    6 +-
 .../flink/runtime/operators/NoOpDriver.java     |    6 +-
 .../flink/runtime/operators/PactDriver.java     |   90 --
 .../runtime/operators/PactTaskContext.java      |   70 -
 .../runtime/operators/ReduceCombineDriver.java  |    6 +-
 .../flink/runtime/operators/ReduceDriver.java   |    6 +-
 .../runtime/operators/RegularPactTask.java      | 1499 ------------------
 .../runtime/operators/ResettableDriver.java     |   44 +
 .../runtime/operators/ResettablePactDriver.java |   44 -
 .../flink/runtime/operators/TaskContext.java    |   70 +
 .../operators/UnionWithTempOperator.java        |    6 +-
 .../chaining/ChainedAllReduceDriver.java        |    8 +-
 .../chaining/ChainedCollectorMapDriver.java     |    8 +-
 .../operators/chaining/ChainedDriver.java       |    8 +-
 .../chaining/ChainedFlatMapDriver.java          |    8 +-
 .../operators/chaining/ChainedMapDriver.java    |    8 +-
 .../chaining/GroupCombineChainedDriver.java     |    8 +-
 .../SynchronousChainedCombineDriver.java        |    8 +-
 .../runtime/operators/util/TaskConfig.java      |    8 +-
 .../TaskDeploymentDescriptorTest.java           |    4 +-
 .../ExecutionGraphDeploymentTest.java           |   16 +-
 .../operators/chaining/ChainTaskTest.java       |   10 +-
 .../operators/drivers/TestTaskContext.java      |    4 +-
 .../testutils/BinaryOperatorTestBase.java       |   20 +-
 .../operators/testutils/DriverTestBase.java     |   20 +-
 .../operators/testutils/TaskTestBase.java       |    4 +-
 .../testutils/UnaryOperatorTestBase.java        |   20 +-
 .../src/test/resources/logback-test.xml         |    2 +-
 .../src/test/resources/logback-test.xml         |    2 +-
 .../flink/tez/runtime/RegularProcessor.java     |    6 +-
 .../org/apache/flink/tez/runtime/TezTask.java   |   18 +-
 .../src/test/resources/logback-test.xml         |    2 +-
 flink-tests/src/test/resources/logback-test.xml |    2 +-
 78 files changed, 3236 insertions(+), 3275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-clients/src/main/java/org/apache/flink/client/web/JobJSONServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/web/JobJSONServlet.java 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobJSONServlet.java
new file mode 100644
index 0000000..77250d3
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/web/JobJSONServlet.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.client.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.client.program.PackagedProgram;
+
+
+public class JobJSONServlet extends HttpServlet {
+       
+       /** Serial UID for serialization interoperability. */
+       private static final long serialVersionUID = 558077298726449201L;
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobJSONServlet.class);
+
+       // 
------------------------------------------------------------------------
+
+       private static final String JOB_PARAM_NAME = "job";
+
+       private static final String CLASS_PARAM_NAME = "assemblerClass";
+
+       // 
------------------------------------------------------------------------
+
+       private final File jobStoreDirectory; // the directory in which the 
jobs are stored
+
+       public JobJSONServlet(File jobStoreDirectory) {
+               this.jobStoreDirectory = jobStoreDirectory;
+       }
+
+       @Override
+       protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
+               resp.setContentType("application/json");
+
+               String jobName = req.getParameter(JOB_PARAM_NAME);
+               if (jobName == null) {
+                       LOG.warn("Received request without job parameter 
name.");
+                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+                       return;
+               }
+
+               // check, if the jar exists
+               File jarFile = new File(jobStoreDirectory, jobName);
+               if (!jarFile.exists()) {
+                       LOG.warn("Received request for non-existing jar file.");
+                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+                       return;
+               }
+
+               // create the pact plan
+               PackagedProgram pactProgram;
+               try {
+                       pactProgram = new PackagedProgram(jarFile, 
req.getParameter(CLASS_PARAM_NAME), new String[0]);
+               }
+               catch (Throwable t) {
+                       LOG.info("Instantiating the PactProgram for '" + 
jarFile.getName() + "' failed.", t);
+                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+                       resp.getWriter().print(t.getMessage());
+                       return;
+               }
+               
+               String jsonPlan = null;
+               String programDescription = null;
+               
+               try {
+                       jsonPlan = pactProgram.getPreviewPlan();
+               }
+               catch (Throwable t) {
+                       LOG.error("Failed to create json dump of pact 
program.", t);
+               }
+               
+               try {
+                       programDescription = pactProgram.getDescription();
+               }
+               catch (Throwable t) {
+                       LOG.error("Failed to create description of pact 
program.", t);
+               }
+                       
+               if (jsonPlan == null && programDescription == null) {
+                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+                       return;
+               } else {
+                       resp.setStatus(HttpServletResponse.SC_OK);
+                       PrintWriter wrt = resp.getWriter();
+                       wrt.print("{ \"jobname\": \"");
+                       wrt.print(jobName);
+                       if (jsonPlan != null) {
+                               wrt.print("\", \"plan\": ");
+                               wrt.println(jsonPlan);
+                       }
+                       if (programDescription != null) {
+                               wrt.print(", \"description\": \"");
+                               wrt.print(escapeString(programDescription));
+                               wrt.print("\"");
+                       }
+                       
+                       wrt.println("}");
+               }
+       }
+
+       protected String escapeString(String str) {
+               int len = str.length();
+               char[] s = str.toCharArray();
+               StringBuilder sb = new StringBuilder();
+
+               for (int i = 0; i < len; i += 1) {
+                       char c = s[i];
+                       if ((c == '\\') || (c == '"') || (c == '/')) {
+                               sb.append('\\');
+                               sb.append(c);
+                       }
+                       else if (c == '\b') {
+                               sb.append("\\b");
+                       } else if (c == '\t') {
+                               sb.append("\\t");
+                       } else if (c == '\n') {
+                               sb.append("<br>");
+                       } else if (c == '\f') {
+                               sb.append("\\f");
+                       } else if (c == '\r') {
+                               sb.append("\\r");
+                       } else if (c == '>') {
+                               sb.append("&gt;");
+                       } else if (c == '<') {
+                               sb.append("&lt;");
+                       } else {
+                               if (c < ' ') {
+                                       // Unreadable throw away
+                               } else {
+                                       sb.append(c);
+                               }
+                       }
+               }
+
+               return sb.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
 
b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
deleted file mode 100644
index 019fc50..0000000
--- 
a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.client.program.PackagedProgram;
-
-
-public class PactJobJSONServlet extends HttpServlet {
-       
-       /** Serial UID for serialization interoperability. */
-       private static final long serialVersionUID = 558077298726449201L;
-       
-       private static final Logger LOG = 
LoggerFactory.getLogger(PactJobJSONServlet.class);
-
-       // 
------------------------------------------------------------------------
-
-       private static final String JOB_PARAM_NAME = "job";
-
-       private static final String CLASS_PARAM_NAME = "assemblerClass";
-
-       // 
------------------------------------------------------------------------
-
-       private final File jobStoreDirectory; // the directory in which the 
jobs are stored
-
-       public PactJobJSONServlet(File jobStoreDirectory) {
-               this.jobStoreDirectory = jobStoreDirectory;
-       }
-
-       @Override
-       protected void doGet(HttpServletRequest req, HttpServletResponse resp) 
throws ServletException, IOException {
-               resp.setContentType("application/json");
-
-               String jobName = req.getParameter(JOB_PARAM_NAME);
-               if (jobName == null) {
-                       LOG.warn("Received request without job parameter 
name.");
-                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-                       return;
-               }
-
-               // check, if the jar exists
-               File jarFile = new File(jobStoreDirectory, jobName);
-               if (!jarFile.exists()) {
-                       LOG.warn("Received request for non-existing jar file.");
-                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-                       return;
-               }
-
-               // create the pact plan
-               PackagedProgram pactProgram;
-               try {
-                       pactProgram = new PackagedProgram(jarFile, 
req.getParameter(CLASS_PARAM_NAME), new String[0]);
-               }
-               catch (Throwable t) {
-                       LOG.info("Instantiating the PactProgram for '" + 
jarFile.getName() + "' failed.", t);
-                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-                       resp.getWriter().print(t.getMessage());
-                       return;
-               }
-               
-               String jsonPlan = null;
-               String programDescription = null;
-               
-               try {
-                       jsonPlan = pactProgram.getPreviewPlan();
-               }
-               catch (Throwable t) {
-                       LOG.error("Failed to create json dump of pact 
program.", t);
-               }
-               
-               try {
-                       programDescription = pactProgram.getDescription();
-               }
-               catch (Throwable t) {
-                       LOG.error("Failed to create description of pact 
program.", t);
-               }
-                       
-               if (jsonPlan == null && programDescription == null) {
-                       resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-                       return;
-               } else {
-                       resp.setStatus(HttpServletResponse.SC_OK);
-                       PrintWriter wrt = resp.getWriter();
-                       wrt.print("{ \"jobname\": \"");
-                       wrt.print(jobName);
-                       if (jsonPlan != null) {
-                               wrt.print("\", \"plan\": ");
-                               wrt.println(jsonPlan);
-                       }
-                       if (programDescription != null) {
-                               wrt.print(", \"description\": \"");
-                               wrt.print(escapeString(programDescription));
-                               wrt.print("\"");
-                       }
-                       
-                       wrt.println("}");
-               }
-       }
-
-       protected String escapeString(String str) {
-               int len = str.length();
-               char[] s = str.toCharArray();
-               StringBuilder sb = new StringBuilder();
-
-               for (int i = 0; i < len; i += 1) {
-                       char c = s[i];
-                       if ((c == '\\') || (c == '"') || (c == '/')) {
-                               sb.append('\\');
-                               sb.append(c);
-                       }
-                       else if (c == '\b') {
-                               sb.append("\\b");
-                       } else if (c == '\t') {
-                               sb.append("\\t");
-                       } else if (c == '\n') {
-                               sb.append("<br>");
-                       } else if (c == '\f') {
-                               sb.append("\\f");
-                       } else if (c == '\r') {
-                               sb.append("\\r");
-                       } else if (c == '>') {
-                               sb.append("&gt;");
-                       } else if (c == '<') {
-                               sb.append("&lt;");
-                       } else {
-                               if (c < ' ') {
-                                       // Unreadable throw away
-                               } else {
-                                       sb.append(c);
-                               }
-                       }
-               }
-
-               return sb.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
 
b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
index bab3cf7..68a0706 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
@@ -144,7 +144,7 @@ public class WebInterfaceServer {
                CliFrontend cli = new CliFrontend(configDir);
                ServletContextHandler servletContext = new 
ServletContextHandler(ServletContextHandler.SESSIONS);
                servletContext.setContextPath("/");
-               servletContext.addServlet(new ServletHolder(new 
PactJobJSONServlet(uploadDir)), "/pactPlan");
+               servletContext.addServlet(new ServletHolder(new 
JobJSONServlet(uploadDir)), "/pactPlan");
                servletContext.addServlet(new ServletHolder(new 
PlanDisplayServlet(jobManagerWebPort)), "/showPlan");
                servletContext.addServlet(new ServletHolder(new 
JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs");
                servletContext.addServlet(new ServletHolder(new 
JobSubmissionServlet(cli, uploadDir, planDumpDir)), "/runJob");

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 309c968..07d8544 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -119,7 +119,7 @@ public abstract class TypeInformation<T> implements 
Serializable {
         */
        public List<TypeInformation<?>> getGenericParameters() {
                // Return an empty list as the default implementation
-               return new LinkedList<TypeInformation<?>>();
+               return new LinkedList<>();
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-core/src/main/java/org/apache/flink/types/MapValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/MapValue.java 
b/flink-core/src/main/java/org/apache/flink/types/MapValue.java
index ed5b4e1..a6cafd2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/MapValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/MapValue.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.ReflectionUtil;
 
 /**
  * Generic map base type for PACT programs that implements the Value and Map 
interfaces.
- * PactMap encapsulates a Java HashMap object.
+ * The {@link MapValue} encapsulates a Java {@link HashMap} object.
  * 
  * @see org.apache.flink.types.Value
  * @see java.util.Map
@@ -56,10 +56,10 @@ public abstract class MapValue<K extends Value, V extends 
Value> implements Valu
         * Initializes the encapsulated map with an empty HashMap.
         */
        public MapValue() {
-               this.keyClass = ReflectionUtil.<K> 
getTemplateType1(this.getClass());
-               this.valueClass = ReflectionUtil.<V> 
getTemplateType2(this.getClass());
+               this.keyClass = 
ReflectionUtil.getTemplateType1(this.getClass());
+               this.valueClass = 
ReflectionUtil.getTemplateType2(this.getClass());
 
-               this.map = new HashMap<K, V>();
+               this.map = new HashMap<>();
        }
 
        /**
@@ -68,10 +68,10 @@ public abstract class MapValue<K extends Value, V extends 
Value> implements Valu
         * @param map Map holding all entries with which the new encapsulated 
map is filled.
         */
        public MapValue(Map<K, V> map) {
-               this.keyClass = ReflectionUtil.<K> 
getTemplateType1(this.getClass());
-               this.valueClass = ReflectionUtil.<V> 
getTemplateType2(this.getClass());
+               this.keyClass = 
ReflectionUtil.getTemplateType1(this.getClass());
+               this.valueClass = 
ReflectionUtil.getTemplateType2(this.getClass());
 
-               this.map = new HashMap<K, V>(map);
+               this.map = new HashMap<>(map);
        }
 
        @Override
@@ -87,9 +87,7 @@ public abstract class MapValue<K extends Value, V extends 
Value> implements Valu
                                val.read(in);
                                this.map.put(key, val);
                        }
-               } catch (final InstantiationException e) {
-                       throw new RuntimeException(e);
-               } catch (final IllegalAccessException e) {
+               } catch (final InstantiationException | IllegalAccessException 
e) {
                        throw new RuntimeException(e);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java 
b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
index b631799..5c81e4a 100644
--- 
a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
@@ -32,12 +32,6 @@ import org.junit.Assert;
 
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.Pair;
-import org.apache.flink.types.StringValue;
 import org.junit.Before;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 3567fad..c15e47a 100644
--- 
a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ 
b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -55,10 +55,10 @@ import org.apache.flink.optimizer.util.Utils;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import 
org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
+import org.apache.flink.runtime.iterative.task.IterationHeadTask;
+import org.apache.flink.runtime.iterative.task.IterationIntermediateTask;
 import 
org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.iterative.task.IterationTailTask;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -66,17 +66,17 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
 import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
 import org.apache.flink.runtime.operators.DataSinkTask;
 import org.apache.flink.runtime.operators.DataSourceTask;
 import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.JoinDriver;
 import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
 import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
-import org.apache.flink.runtime.operators.JoinDriver;
 import org.apache.flink.runtime.operators.NoOpDriver;
-import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -829,7 +829,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                } else {
                        // create task vertex
                        vertex = new JobVertex(taskName);
-                       vertex.setInvokableClass((this.currentIteration != null 
&& node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : 
RegularPactTask.class);
+                       vertex.setInvokableClass((this.currentIteration != null 
&& node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);
                        
                        config = new TaskConfig(vertex.getConfiguration());
                        config.setDriver(ds.getDriverClass());
@@ -854,7 +854,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                final DriverStrategy ds = node.getDriverStrategy();
                final JobVertex vertex = new JobVertex(taskName);
                final TaskConfig config = new 
TaskConfig(vertex.getConfiguration());
-               vertex.setInvokableClass( (this.currentIteration != null && 
node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : 
RegularPactTask.class);
+               vertex.setInvokableClass( (this.currentIteration != null && 
node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);
                
                // set user code
                
config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
@@ -951,7 +951,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                        }
                        
                        // reset the vertex type to iteration head
-                       
headVertex.setInvokableClass(IterationHeadPactTask.class);
+                       headVertex.setInvokableClass(IterationHeadTask.class);
                        headConfig = new 
TaskConfig(headVertex.getConfiguration());
                        toReturn = null;
                } else {
@@ -959,7 +959,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> 
{
                        // everything else happens in the post visit, after the 
input (the initial partial solution)
                        // is connected.
                        headVertex = new JobVertex("PartialSolution 
("+iteration.getNodeName()+")");
-                       
headVertex.setInvokableClass(IterationHeadPactTask.class);
+                       headVertex.setInvokableClass(IterationHeadTask.class);
                        headConfig = new 
TaskConfig(headVertex.getConfiguration());
                        headConfig.setDriver(NoOpDriver.class);
                        toReturn = headVertex;
@@ -1019,7 +1019,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                        }
                        
                        // reset the vertex type to iteration head
-                       
headVertex.setInvokableClass(IterationHeadPactTask.class);
+                       headVertex.setInvokableClass(IterationHeadTask.class);
                        headConfig = new 
TaskConfig(headVertex.getConfiguration());
                        toReturn = null;
                } else {
@@ -1027,7 +1027,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                        // everything else happens in the post visit, after the 
input (the initial partial solution)
                        // is connected.
                        headVertex = new 
JobVertex("IterationHead("+iteration.getNodeName()+")");
-                       
headVertex.setInvokableClass(IterationHeadPactTask.class);
+                       headVertex.setInvokableClass(IterationHeadTask.class);
                        headConfig = new 
TaskConfig(headVertex.getConfiguration());
                        headConfig.setDriver(NoOpDriver.class);
                        toReturn = headVertex;
@@ -1310,7 +1310,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                // No following termination criterion
                if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {
                        
-                       
rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
+                       
rootOfStepFunctionVertex.setInvokableClass(IterationTailTask.class);
                        
                        
tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
                }
@@ -1337,7 +1337,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                                tailConfigOfTerminationCriterion = new 
TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
                        }
                        
-                       
rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
+                       
rootOfTerminationCriterionVertex.setInvokableClass(IterationTailTask.class);
                        // Hack
                        
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
                        
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
@@ -1457,7 +1457,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                                worksetTailConfig.setIsWorksetUpdate();
                                
                                if (hasWorksetTail) {
-                                       
nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
+                                       
nextWorksetVertex.setInvokableClass(IterationTailTask.class);
                                        
                                        
worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
                                }
@@ -1481,7 +1481,7 @@ public class JobGraphGenerator implements 
Visitor<PlanNode> {
                                solutionDeltaConfig.setIsSolutionSetUpdate();
                                
                                if (hasSolutionSetTail) {
-                                       
solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
+                                       
solutionDeltaVertex.setInvokableClass(IterationTailTask.class);
                                        
                                        
solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
                                        

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime-web/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/resources/logback-test.xml 
b/flink-runtime-web/src/test/resources/logback-test.xml
index 9d4f644..2235251 100644
--- a/flink-runtime-web/src/test/resources/logback-test.xml
+++ b/flink-runtime-web/src/test/resources/logback-test.xml
@@ -31,7 +31,7 @@
          throw error to test failing scenarios. Logging those would overflow 
the log. -->
          <!---->
     <logger name="org.apache.flink.runtime.operators.DataSinkTask" 
level="OFF"/>
-    <logger name="org.apache.flink.runtime.operators.RegularPactTask" 
level="OFF"/>
+    <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
     <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
     <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
     <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
index 660a62c..7d0454e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 
 public class BroadcastVariableManager {
        
@@ -33,7 +33,7 @@ public class BroadcastVariableManager {
        
        // 
--------------------------------------------------------------------------------------------
        
-       public <T> BroadcastVariableMaterialization<T, ?> 
materializeBroadcastVariable(String name, int superstep, RegularPactTask<?, ?> 
holder, 
+       public <T> BroadcastVariableMaterialization<T, ?> 
materializeBroadcastVariable(String name, int superstep, BatchTask<?, ?> holder,
                        MutableReader<?> reader, TypeSerializerFactory<T> 
serializerFactory) throws IOException
        {
                final BroadcastVariableKey key = new 
BroadcastVariableKey(holder.getEnvironment().getJobVertexId(), name, superstep);
@@ -77,12 +77,12 @@ public class BroadcastVariableManager {
        }
        
        
-       public void releaseReference(String name, int superstep, 
RegularPactTask<?, ?> referenceHolder) {
+       public void releaseReference(String name, int superstep, BatchTask<?, 
?> referenceHolder) {
                BroadcastVariableKey key = new 
BroadcastVariableKey(referenceHolder.getEnvironment().getJobVertexId(), name, 
superstep);
                releaseReference(key, referenceHolder);
        }
        
-       public void releaseReference(BroadcastVariableKey key, 
RegularPactTask<?, ?> referenceHolder) {
+       public void releaseReference(BroadcastVariableKey key, BatchTask<?, ?> 
referenceHolder) {
                BroadcastVariableMaterialization<?, ?> mat = variables.get(key);
                
                // release this reference
@@ -93,7 +93,7 @@ public class BroadcastVariableManager {
        }
        
        
-       public void releaseAllReferencesFromTask(RegularPactTask<?, ?> 
referenceHolder) {
+       public void releaseAllReferencesFromTask(BatchTask<?, ?> 
referenceHolder) {
                // go through all registered variables 
                for (Map.Entry<BroadcastVariableKey, 
BroadcastVariableMaterialization<?, ?>> entry : variables.entrySet()) {
                        BroadcastVariableMaterialization<?, ?> mat = 
entry.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
index c4dd8a9..86e0111 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.slf4j.Logger;
@@ -44,7 +44,7 @@ public class BroadcastVariableMaterialization<T, C> {
        private static final Logger LOG = 
LoggerFactory.getLogger(BroadcastVariableMaterialization.class);
        
        
-       private final Set<RegularPactTask<?, ?>> references = new 
HashSet<RegularPactTask<?,?>>();
+       private final Set<BatchTask<?, ?>> references = new 
HashSet<BatchTask<?,?>>();
        
        private final Object materializationMonitor = new Object();
        
@@ -65,7 +65,7 @@ public class BroadcastVariableMaterialization<T, C> {
 
        // 
--------------------------------------------------------------------------------------------
        
-       public void materializeVariable(MutableReader<?> reader, 
TypeSerializerFactory<?> serializerFactory, RegularPactTask<?, ?> 
referenceHolder)
+       public void materializeVariable(MutableReader<?> reader, 
TypeSerializerFactory<?> serializerFactory, BatchTask<?, ?> referenceHolder)
                        throws MaterializationExpiredException, IOException
        {
                Preconditions.checkNotNull(reader);
@@ -156,15 +156,15 @@ public class BroadcastVariableMaterialization<T, C> {
                }
        }
        
-       public boolean decrementReference(RegularPactTask<?, ?> 
referenceHolder) {
+       public boolean decrementReference(BatchTask<?, ?> referenceHolder) {
                return decrementReferenceInternal(referenceHolder, true);
        }
        
-       public boolean decrementReferenceIfHeld(RegularPactTask<?, ?> 
referenceHolder) {
+       public boolean decrementReferenceIfHeld(BatchTask<?, ?> 
referenceHolder) {
                return decrementReferenceInternal(referenceHolder, false);
        }
        
-       private boolean decrementReferenceInternal(RegularPactTask<?, ?> 
referenceHolder, boolean errorIfNoReference) {
+       private boolean decrementReferenceInternal(BatchTask<?, ?> 
referenceHolder, boolean errorIfNoReference) {
                synchronized (references) {
                        if (disposed || references.isEmpty()) {
                                if (errorIfNoReference) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
index bc11d3f..ffc74d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
+import org.apache.flink.runtime.iterative.task.IterationHeadTask;
+import org.apache.flink.runtime.iterative.task.IterationTailTask;
+
 import java.util.concurrent.CountDownLatch;
 
 /**
  * Resettable barrier to synchronize the
- * {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} and
- * the {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask} 
in case of
+ * {@link IterationHeadTask} and
+ * the {@link IterationTailTask} in case of
  * iterations that contain a separate solution set tail.
  */
 public class SolutionSetUpdateBarrier {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
index abbecde..352a262 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
+import org.apache.flink.runtime.iterative.task.IterationHeadTask;
+import org.apache.flink.runtime.iterative.task.IterationTailTask;
+
 /**
  * Broker to hand over {@link SolutionSetUpdateBarrier} from 
- * {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} to
- * {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask}.
+ * {@link IterationHeadTask} to
+ * {@link IterationTailTask}.
  */
 public class SolutionSetUpdateBarrierBroker extends 
Broker<SolutionSetUpdateBarrier> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
deleted file mode 100644
index efe74f9..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.core.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.operators.util.JoinHashMap;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import 
org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
-import 
org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
-import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
-import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.operators.ResettablePactDriver;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-/**
- * The abstract base class for all tasks able to participate in an iteration.
- */
-public abstract class AbstractIterativePactTask<S extends Function, OT> 
extends RegularPactTask<S, OT>
-               implements Terminable
-{
-       private static final Logger log = 
LoggerFactory.getLogger(AbstractIterativePactTask.class);
-       
-       protected LongSumAggregator worksetAggregator;
-
-       protected BlockingBackChannel worksetBackChannel;
-
-       protected boolean isWorksetIteration;
-
-       protected boolean isWorksetUpdate;
-
-       protected boolean isSolutionSetUpdate;
-       
-
-       private RuntimeAggregatorRegistry iterationAggregators;
-
-       private String brokerKey;
-
-       private int superstepNum = 1;
-       
-       private volatile boolean terminationRequested;
-
-       // 
--------------------------------------------------------------------------------------------
-       // Main life cycle methods that implement the iterative behavior
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       protected void initialize() throws Exception {
-               super.initialize();
-
-               // check if the driver is resettable
-               if (this.driver instanceof ResettablePactDriver) {
-                       final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
-                       // make sure that the according inputs are not reseted
-                       for (int i = 0; i < resDriver.getNumberOfInputs(); i++) 
{
-                               if (resDriver.isInputResettable(i)) {
-                                       excludeFromReset(i);
-                               }
-                       }
-               }
-               
-               TaskConfig config = getLastTasksConfig();
-               isWorksetIteration = config.getIsWorksetIteration();
-               isWorksetUpdate = config.getIsWorksetUpdate();
-               isSolutionSetUpdate = config.getIsSolutionSetUpdate();
-
-               if (isWorksetUpdate) {
-                       worksetBackChannel = 
BlockingBackChannelBroker.instance().getAndRemove(brokerKey());
-
-                       if (isWorksetIteration) {
-                               worksetAggregator = 
getIterationAggregators().getAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME);
-
-                               if (worksetAggregator == null) {
-                                       throw new RuntimeException("Missing 
workset elements count aggregator.");
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public void run() throws Exception {
-               if (inFirstIteration()) {
-                       if (this.driver instanceof ResettablePactDriver) {
-                               // initialize the repeatable driver
-                               ((ResettablePactDriver<?, ?>) 
this.driver).initialize();
-                       }
-               } else {
-                       reinstantiateDriver();
-                       resetAllInputs();
-                       
-                       // re-read the iterative broadcast variables
-                       for (int i : this.iterativeBroadcastInputs) {
-                               final String name = 
getTaskConfig().getBroadcastInputName(i);
-                               readAndSetBroadcastInput(i, name, 
this.runtimeUdfContext, superstepNum);
-                       }
-               }
-
-               // call the parent to execute the superstep
-               super.run();
-               
-               // release the iterative broadcast variables
-               for (int i : this.iterativeBroadcastInputs) {
-                       final String name = 
getTaskConfig().getBroadcastInputName(i);
-                       releaseBroadcastVariables(name, superstepNum, 
this.runtimeUdfContext);
-               }
-       }
-
-       @Override
-       protected void closeLocalStrategiesAndCaches() {
-               try {
-                       super.closeLocalStrategiesAndCaches();
-               }
-               finally {
-                       if (this.driver instanceof ResettablePactDriver) {
-                               final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
-                               try {
-                                       resDriver.teardown();
-                               } catch (Throwable t) {
-                                       log.error("Error while shutting down an 
iterative operator.", t);
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public DistributedRuntimeUDFContext createRuntimeContext(String 
taskName) {
-               Environment env = getEnvironment();
-               return new IterativeRuntimeUdfContext(taskName, 
env.getNumberOfSubtasks(),
-                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig(),
-                               env.getDistributedCacheEntries(), 
this.accumulatorMap);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       // Utility Methods for Iteration Handling
-       // 
--------------------------------------------------------------------------------------------
-
-       protected boolean inFirstIteration() {
-               return this.superstepNum == 1;
-       }
-
-       protected int currentIteration() {
-               return this.superstepNum;
-       }
-
-       protected void incrementIterationCounter() {
-               this.superstepNum++;
-       }
-
-       public String brokerKey() {
-               if (brokerKey == null) {
-                       int iterationId = config.getIterationId();
-                       brokerKey = getEnvironment().getJobID().toString() + 
'#' + iterationId + '#' +
-                                       
getEnvironment().getIndexInSubtaskGroup();
-               }
-               return brokerKey;
-       }
-
-       private void reinstantiateDriver() throws Exception {
-               if (this.driver instanceof ResettablePactDriver) {
-                       final ResettablePactDriver<?, ?> resDriver = 
(ResettablePactDriver<?, ?>) this.driver;
-                       resDriver.reset();
-               } else {
-                       Class<? extends PactDriver<S, OT>> driverClass = 
this.config.getDriver();
-                       this.driver = 
InstantiationUtil.instantiate(driverClass, PactDriver.class);
-
-                       try {
-                               this.driver.setup(this);
-                       }
-                       catch (Throwable t) {
-                               throw new Exception("The pact driver setup for 
'" + this.getEnvironment().getTaskName() +
-                                               "' , caused an error: " + 
t.getMessage(), t);
-                       }
-               }
-       }
-
-       public RuntimeAggregatorRegistry getIterationAggregators() {
-               if (this.iterationAggregators == null) {
-                       this.iterationAggregators = 
IterationAggregatorBroker.instance().get(brokerKey());
-               }
-               return this.iterationAggregators;
-       }
-
-       protected void verifyEndOfSuperstepState() throws IOException {
-               // sanity check that there is at least one iterative input 
reader
-               if (this.iterativeInputs.length == 0 && 
this.iterativeBroadcastInputs.length == 0) {
-                       throw new IllegalStateException("Error: Iterative task 
without a single iterative input.");
-               }
-
-               for (int inputNum : this.iterativeInputs) {
-                       MutableReader<?> reader = this.inputReaders[inputNum];
-
-                       if (!reader.isFinished()) {
-                               if (reader.hasReachedEndOfSuperstep()) {
-                                       reader.startNextSuperstep();
-                               }
-                               else {
-                                       // need to read and drop all 
non-consumed data until we reach the end-of-superstep
-                                       @SuppressWarnings("unchecked")
-                                       MutableObjectIterator<Object> inIter = 
(MutableObjectIterator<Object>) this.inputIterators[inputNum];
-                                       Object o = 
this.inputSerializers[inputNum].getSerializer().createInstance();
-                                       while ((o = inIter.next(o)) != null);
-                                       
-                                       if (!reader.isFinished()) {
-                                               // also reset the 
end-of-superstep state
-                                               reader.startNextSuperstep();
-                                       }
-                               }
-                       }
-               }
-               
-               for (int inputNum : this.iterativeBroadcastInputs) {
-                       MutableReader<?> reader = 
this.broadcastInputReaders[inputNum];
-
-                       if (!reader.isFinished()) {
-                               
-                               // sanity check that the BC input is at the end 
of the superstep
-                               if (!reader.hasReachedEndOfSuperstep()) {
-                                       throw new IllegalStateException("An 
iterative broadcast input has not been fully consumed.");
-                               }
-                               
-                               reader.startNextSuperstep();
-                       }
-               }
-       }
-
-       @Override
-       public boolean terminationRequested() {
-               return this.terminationRequested;
-       }
-
-       @Override
-       public void requestTermination() {
-               this.terminationRequested = true;
-       }
-
-       @Override
-       public void cancel() throws Exception {
-               requestTermination();
-               super.cancel();
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-       // Iteration State Update Handling
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       /**
-        * Creates a new {@link WorksetUpdateOutputCollector}.
-        * <p>
-        * This collector is used by {@link IterationIntermediatePactTask} or 
{@link IterationTailPactTask} to update the
-        * workset.
-        * <p>
-        * If a non-null delegate is given, the new {@link Collector} will 
write to the solution set and also call
-        * collect(T) of the delegate.
-        *
-        * @param delegate null -OR- the delegate on which to call collect() by 
the newly created collector
-        * @return a new {@link WorksetUpdateOutputCollector}
-        */
-       protected Collector<OT> 
createWorksetUpdateOutputCollector(Collector<OT> delegate) {
-               DataOutputView outputView = worksetBackChannel.getWriteEnd();
-               TypeSerializer<OT> serializer = getOutputSerializer();
-               return new WorksetUpdateOutputCollector<OT>(outputView, 
serializer, delegate);
-       }
-
-       protected Collector<OT> createWorksetUpdateOutputCollector() {
-               return createWorksetUpdateOutputCollector(null);
-       }
-
-       /**
-        * Creates a new solution set update output collector.
-        * <p>
-        * This collector is used by {@link IterationIntermediatePactTask} or 
{@link IterationTailPactTask} to update the
-        * solution set of workset iterations. Depending on the task 
configuration, either a fast (non-probing)
-        * {@link 
org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or 
normal (re-probing)
-        * {@link SolutionSetUpdateOutputCollector} is created.
-        * <p>
-        * If a non-null delegate is given, the new {@link Collector} will 
write back to the solution set and also call
-        * collect(T) of the delegate.
-        *
-        * @param delegate null -OR- a delegate collector to be called by the 
newly created collector
-        * @return a new {@link 
org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or
-        * {@link SolutionSetUpdateOutputCollector}
-        */
-       protected Collector<OT> 
createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
-               Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
-               
-               Object ss = solutionSetBroker.get(brokerKey());
-               if (ss instanceof CompactingHashTable) {
-                       @SuppressWarnings("unchecked")
-                       CompactingHashTable<OT> solutionSet = 
(CompactingHashTable<OT>) ss;
-                       return new 
SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
-               }
-               else if (ss instanceof JoinHashMap) {
-                       @SuppressWarnings("unchecked")
-                       JoinHashMap<OT> map = (JoinHashMap<OT>) ss;
-                       return new 
SolutionSetObjectsUpdateOutputCollector<OT>(map, delegate);
-               } else {
-                       throw new RuntimeException("Unrecognized solution set 
handle: " + ss);
-               }
-       }
-
-       /**
-        * @return output serializer of this task
-        */
-       private TypeSerializer<OT> getOutputSerializer() {
-               TypeSerializerFactory<OT> serializerFactory;
-
-               if ((serializerFactory = 
getLastTasksConfig().getOutputSerializer(getUserCodeClassLoader())) ==
-                               null) {
-                       throw new RuntimeException("Missing output serializer 
for workset update.");
-               }
-
-               return serializerFactory.getSerializer();
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       private class IterativeRuntimeUdfContext extends 
DistributedRuntimeUDFContext implements IterationRuntimeContext {
-
-               public IterativeRuntimeUdfContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
-                                                                               
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks,
-                                                                               
Map<String, Accumulator<?,?>> accumulatorMap) {
-                       super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, cpTasks, accumulatorMap);
-               }
-
-               @Override
-               public int getSuperstepNumber() {
-                       return AbstractIterativePactTask.this.superstepNum;
-               }
-
-               @Override
-               public <T extends Aggregator<?>> T 
getIterationAggregator(String name) {
-                       return getIterationAggregators().<T>getAggregator(name);
-               }
-
-               @Override
-               @SuppressWarnings("unchecked")
-               public <T extends Value> T getPreviousIterationAggregate(String 
name) {
-                       return (T) 
getIterationAggregators().getPreviousGlobalAggregate(name);
-               }
-
-               @Override
-               public <V, A extends Serializable> void addAccumulator(String 
name, Accumulator<V, A> newAccumulator) {
-                       // only add accumulator on first iteration
-                       if (inFirstIteration()) {
-                               super.addAccumulator(name, newAccumulator);
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
new file mode 100644
index 0000000..215111b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
+import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
+import 
org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
+import 
org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
+import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
+import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.ResettableDriver;
+import org.apache.flink.runtime.operators.hash.CompactingHashTable;
+import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * The abstract base class for all tasks able to participate in an iteration.
+ */
+public abstract class AbstractIterativeTask<S extends Function, OT> extends 
BatchTask<S, OT>
+               implements Terminable
+{
+       private static final Logger log = 
LoggerFactory.getLogger(AbstractIterativeTask.class);
+       
+       protected LongSumAggregator worksetAggregator;
+
+       protected BlockingBackChannel worksetBackChannel;
+
+       protected boolean isWorksetIteration;
+
+       protected boolean isWorksetUpdate;
+
+       protected boolean isSolutionSetUpdate;
+       
+
+       private RuntimeAggregatorRegistry iterationAggregators;
+
+       private String brokerKey;
+
+       private int superstepNum = 1;
+       
+       private volatile boolean terminationRequested;
+
+       // 
--------------------------------------------------------------------------------------------
+       // Main life cycle methods that implement the iterative behavior
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       protected void initialize() throws Exception {
+               super.initialize();
+
+               // check if the driver is resettable
+               if (this.driver instanceof ResettableDriver) {
+                       final ResettableDriver<?, ?> resDriver = 
(ResettableDriver<?, ?>) this.driver;
+                       // make sure that the according inputs are not reseted
+                       for (int i = 0; i < resDriver.getNumberOfInputs(); i++) 
{
+                               if (resDriver.isInputResettable(i)) {
+                                       excludeFromReset(i);
+                               }
+                       }
+               }
+               
+               TaskConfig config = getLastTasksConfig();
+               isWorksetIteration = config.getIsWorksetIteration();
+               isWorksetUpdate = config.getIsWorksetUpdate();
+               isSolutionSetUpdate = config.getIsSolutionSetUpdate();
+
+               if (isWorksetUpdate) {
+                       worksetBackChannel = 
BlockingBackChannelBroker.instance().getAndRemove(brokerKey());
+
+                       if (isWorksetIteration) {
+                               worksetAggregator = 
getIterationAggregators().getAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME);
+
+                               if (worksetAggregator == null) {
+                                       throw new RuntimeException("Missing 
workset elements count aggregator.");
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public void run() throws Exception {
+               if (inFirstIteration()) {
+                       if (this.driver instanceof ResettableDriver) {
+                               // initialize the repeatable driver
+                               ((ResettableDriver<?, ?>) 
this.driver).initialize();
+                       }
+               } else {
+                       reinstantiateDriver();
+                       resetAllInputs();
+                       
+                       // re-read the iterative broadcast variables
+                       for (int i : this.iterativeBroadcastInputs) {
+                               final String name = 
getTaskConfig().getBroadcastInputName(i);
+                               readAndSetBroadcastInput(i, name, 
this.runtimeUdfContext, superstepNum);
+                       }
+               }
+
+               // call the parent to execute the superstep
+               super.run();
+               
+               // release the iterative broadcast variables
+               for (int i : this.iterativeBroadcastInputs) {
+                       final String name = 
getTaskConfig().getBroadcastInputName(i);
+                       releaseBroadcastVariables(name, superstepNum, 
this.runtimeUdfContext);
+               }
+       }
+
+       @Override
+       protected void closeLocalStrategiesAndCaches() {
+               try {
+                       super.closeLocalStrategiesAndCaches();
+               }
+               finally {
+                       if (this.driver instanceof ResettableDriver) {
+                               final ResettableDriver<?, ?> resDriver = 
(ResettableDriver<?, ?>) this.driver;
+                               try {
+                                       resDriver.teardown();
+                               } catch (Throwable t) {
+                                       log.error("Error while shutting down an 
iterative operator.", t);
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public DistributedRuntimeUDFContext createRuntimeContext(String 
taskName) {
+               Environment env = getEnvironment();
+               return new IterativeRuntimeUdfContext(taskName, 
env.getNumberOfSubtasks(),
+                               env.getIndexInSubtaskGroup(), 
getUserCodeClassLoader(), getExecutionConfig(),
+                               env.getDistributedCacheEntries(), 
this.accumulatorMap);
+       }
+
+       // 
--------------------------------------------------------------------------------------------
+       // Utility Methods for Iteration Handling
+       // 
--------------------------------------------------------------------------------------------
+
+       protected boolean inFirstIteration() {
+               return this.superstepNum == 1;
+       }
+
+       protected int currentIteration() {
+               return this.superstepNum;
+       }
+
+       protected void incrementIterationCounter() {
+               this.superstepNum++;
+       }
+
+       public String brokerKey() {
+               if (brokerKey == null) {
+                       int iterationId = config.getIterationId();
+                       brokerKey = getEnvironment().getJobID().toString() + 
'#' + iterationId + '#' +
+                                       
getEnvironment().getIndexInSubtaskGroup();
+               }
+               return brokerKey;
+       }
+
+       private void reinstantiateDriver() throws Exception {
+               if (this.driver instanceof ResettableDriver) {
+                       final ResettableDriver<?, ?> resDriver = 
(ResettableDriver<?, ?>) this.driver;
+                       resDriver.reset();
+               } else {
+                       Class<? extends Driver<S, OT>> driverClass = 
this.config.getDriver();
+                       this.driver = 
InstantiationUtil.instantiate(driverClass, Driver.class);
+
+                       try {
+                               this.driver.setup(this);
+                       }
+                       catch (Throwable t) {
+                               throw new Exception("The pact driver setup for 
'" + this.getEnvironment().getTaskName() +
+                                               "' , caused an error: " + 
t.getMessage(), t);
+                       }
+               }
+       }
+
+       public RuntimeAggregatorRegistry getIterationAggregators() {
+               if (this.iterationAggregators == null) {
+                       this.iterationAggregators = 
IterationAggregatorBroker.instance().get(brokerKey());
+               }
+               return this.iterationAggregators;
+       }
+
+       protected void verifyEndOfSuperstepState() throws IOException {
+               // sanity check that there is at least one iterative input 
reader
+               if (this.iterativeInputs.length == 0 && 
this.iterativeBroadcastInputs.length == 0) {
+                       throw new IllegalStateException("Error: Iterative task 
without a single iterative input.");
+               }
+
+               for (int inputNum : this.iterativeInputs) {
+                       MutableReader<?> reader = this.inputReaders[inputNum];
+
+                       if (!reader.isFinished()) {
+                               if (reader.hasReachedEndOfSuperstep()) {
+                                       reader.startNextSuperstep();
+                               }
+                               else {
+                                       // need to read and drop all 
non-consumed data until we reach the end-of-superstep
+                                       @SuppressWarnings("unchecked")
+                                       MutableObjectIterator<Object> inIter = 
(MutableObjectIterator<Object>) this.inputIterators[inputNum];
+                                       Object o = 
this.inputSerializers[inputNum].getSerializer().createInstance();
+                                       while ((o = inIter.next(o)) != null);
+                                       
+                                       if (!reader.isFinished()) {
+                                               // also reset the 
end-of-superstep state
+                                               reader.startNextSuperstep();
+                                       }
+                               }
+                       }
+               }
+               
+               for (int inputNum : this.iterativeBroadcastInputs) {
+                       MutableReader<?> reader = 
this.broadcastInputReaders[inputNum];
+
+                       if (!reader.isFinished()) {
+                               
+                               // sanity check that the BC input is at the end 
of the superstep
+                               if (!reader.hasReachedEndOfSuperstep()) {
+                                       throw new IllegalStateException("An 
iterative broadcast input has not been fully consumed.");
+                               }
+                               
+                               reader.startNextSuperstep();
+                       }
+               }
+       }
+
+       @Override
+       public boolean terminationRequested() {
+               return this.terminationRequested;
+       }
+
+       @Override
+       public void requestTermination() {
+               this.terminationRequested = true;
+       }
+
+       @Override
+       public void cancel() throws Exception {
+               requestTermination();
+               super.cancel();
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+       // Iteration State Update Handling
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       /**
+        * Creates a new {@link WorksetUpdateOutputCollector}.
+        * <p>
+        * This collector is used by {@link IterationIntermediateTask} or 
{@link IterationTailTask} to update the
+        * workset.
+        * <p>
+        * If a non-null delegate is given, the new {@link Collector} will 
write to the solution set and also call
+        * collect(T) of the delegate.
+        *
+        * @param delegate null -OR- the delegate on which to call collect() by 
the newly created collector
+        * @return a new {@link WorksetUpdateOutputCollector}
+        */
+       protected Collector<OT> 
createWorksetUpdateOutputCollector(Collector<OT> delegate) {
+               DataOutputView outputView = worksetBackChannel.getWriteEnd();
+               TypeSerializer<OT> serializer = getOutputSerializer();
+               return new WorksetUpdateOutputCollector<OT>(outputView, 
serializer, delegate);
+       }
+
+       protected Collector<OT> createWorksetUpdateOutputCollector() {
+               return createWorksetUpdateOutputCollector(null);
+       }
+
+       /**
+        * Creates a new solution set update output collector.
+        * <p>
+        * This collector is used by {@link IterationIntermediateTask} or 
{@link IterationTailTask} to update the
+        * solution set of workset iterations. Depending on the task 
configuration, either a fast (non-probing)
+        * {@link 
org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or 
normal (re-probing)
+        * {@link SolutionSetUpdateOutputCollector} is created.
+        * <p>
+        * If a non-null delegate is given, the new {@link Collector} will 
write back to the solution set and also call
+        * collect(T) of the delegate.
+        *
+        * @param delegate null -OR- a delegate collector to be called by the 
newly created collector
+        * @return a new {@link 
org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or
+        * {@link SolutionSetUpdateOutputCollector}
+        */
+       protected Collector<OT> 
createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
+               Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
+               
+               Object ss = solutionSetBroker.get(brokerKey());
+               if (ss instanceof CompactingHashTable) {
+                       @SuppressWarnings("unchecked")
+                       CompactingHashTable<OT> solutionSet = 
(CompactingHashTable<OT>) ss;
+                       return new 
SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
+               }
+               else if (ss instanceof JoinHashMap) {
+                       @SuppressWarnings("unchecked")
+                       JoinHashMap<OT> map = (JoinHashMap<OT>) ss;
+                       return new 
SolutionSetObjectsUpdateOutputCollector<OT>(map, delegate);
+               } else {
+                       throw new RuntimeException("Unrecognized solution set 
handle: " + ss);
+               }
+       }
+
+       /**
+        * @return output serializer of this task
+        */
+       private TypeSerializer<OT> getOutputSerializer() {
+               TypeSerializerFactory<OT> serializerFactory;
+
+               if ((serializerFactory = 
getLastTasksConfig().getOutputSerializer(getUserCodeClassLoader())) ==
+                               null) {
+                       throw new RuntimeException("Missing output serializer 
for workset update.");
+               }
+
+               return serializerFactory.getSerializer();
+       }
+
+       // 
-----------------------------------------------------------------------------------------------------------------
+
+       private class IterativeRuntimeUdfContext extends 
DistributedRuntimeUDFContext implements IterationRuntimeContext {
+
+               public IterativeRuntimeUdfContext(String name, int 
numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+                                                                               
ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks,
+                                                                               
Map<String, Accumulator<?,?>> accumulatorMap) {
+                       super(name, numParallelSubtasks, subtaskIndex, 
userCodeClassLoader, executionConfig, cpTasks, accumulatorMap);
+               }
+
+               @Override
+               public int getSuperstepNumber() {
+                       return AbstractIterativeTask.this.superstepNum;
+               }
+
+               @Override
+               public <T extends Aggregator<?>> T 
getIterationAggregator(String name) {
+                       return getIterationAggregators().<T>getAggregator(name);
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public <T extends Value> T getPreviousIterationAggregate(String 
name) {
+                       return (T) 
getIterationAggregators().getPreviousGlobalAggregate(name);
+               }
+
+               @Override
+               public <V, A extends Serializable> void addAccumulator(String 
name, Accumulator<V, A> newAccumulator) {
+                       // only add accumulator on first iteration
+                       if (inFirstIteration()) {
+                               super.addAccumulator(name, newAccumulator);
+                       }
+               }
+       }
+
+}

Reply via email to