http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
deleted file mode 100644
index 9cb045f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ /dev/null
@@ -1,440 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.operators.util.JoinHashMap;
-import org.apache.flink.api.common.typeutils.TypeComparator;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.disk.InputViewIterator;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
-import 
org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import 
org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
-import org.apache.flink.runtime.iterative.event.TerminationEvent;
-import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
-import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
-import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-/**
- * The head is responsible for coordinating an iteration and can run a
- * {@link org.apache.flink.runtime.operators.PactDriver} inside. It will read
- * the initial input and establish a {@link BlockingBackChannel} to the 
iteration's tail. After successfully processing
- * the input, it will send EndOfSuperstep events to its outputs. It must also 
be connected to a
- * synchronization task and after each superstep, it will wait
- * until it receives an {@link AllWorkersDoneEvent} from the sync, which 
signals that all other heads have also finished
- * their iteration. Starting with
- * the second iteration, the input for the head is the output of the tail, 
transmitted through the backchannel. Once the
- * iteration is done, the head
- * will send a {@link TerminationEvent} to all it's connected tasks, signaling 
them to shutdown.
- * <p>
- * Assumption on the ordering of the outputs: - The first n output gates write 
to channels that go to the tasks of the
- * step function. - The next m output gates to to the tasks that consume the 
final solution. - The last output gate
- * connects to the synchronization task.
- * 
- * @param <X>
- *        The type of the bulk partial solution / solution set and the final 
output.
- * @param <Y>
- *        The type of the feed-back data set (bulk partial solution / 
workset). For bulk iterations, {@code Y} is the
- *        same as {@code X}
- */
-public class IterationHeadPactTask<X, Y, S extends Function, OT> extends 
AbstractIterativePactTask<S, OT> {
-
-       private static final Logger log = 
LoggerFactory.getLogger(IterationHeadPactTask.class);
-
-       private Collector<X> finalOutputCollector;
-
-       private TypeSerializerFactory<Y> feedbackTypeSerializer;
-
-       private TypeSerializerFactory<X> solutionTypeSerializer;
-
-       private ResultPartitionWriter toSync;
-
-       private int feedbackDataInput; // workset or bulk partial solution
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       protected int getNumTaskInputs() {
-               // this task has an additional input in the workset case for 
the initial solution set
-               boolean isWorkset = config.getIsWorksetIteration();
-               return driver.getNumberOfInputs() + (isWorkset ? 1 : 0);
-       }
-
-       @Override
-       protected void initOutputs() throws Exception {
-               // initialize the regular outputs first (the ones into the step 
function).
-               super.initOutputs();
-
-               // at this time, the outputs to the step function are created
-               // add the outputs for the final solution
-               List<RecordWriter<?>> finalOutputWriters = new 
ArrayList<RecordWriter<?>>();
-               final TaskConfig finalOutConfig = 
this.config.getIterationHeadFinalOutputConfig();
-               final ClassLoader userCodeClassLoader = 
getUserCodeClassLoader();
-               AccumulatorRegistry.Reporter reporter = 
getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
-               this.finalOutputCollector = 
RegularPactTask.getOutputCollector(this, finalOutConfig,
-                       userCodeClassLoader, finalOutputWriters, 
config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
-
-               // sanity check the setup
-               final int writersIntoStepFunction = this.eventualOutputs.size();
-               final int writersIntoFinalResult = finalOutputWriters.size();
-               final int syncGateIndex = 
this.config.getIterationHeadIndexOfSyncOutput();
-
-               if (writersIntoStepFunction + writersIntoFinalResult != 
syncGateIndex) {
-                       throw new Exception("Error: Inconsistent head task 
setup - wrong mapping of output gates.");
-               }
-               // now, we can instantiate the sync gate
-               this.toSync = getEnvironment().getWriter(syncGateIndex);
-       }
-
-       /**
-        * the iteration head prepares the backchannel: it allocates memory, 
instantiates a {@link BlockingBackChannel} and
-        * hands it to the iteration tail via a {@link Broker} singleton
-        **/
-       private BlockingBackChannel initBackChannel() throws Exception {
-
-               /* get the size of the memory available to the backchannel */
-               int backChannelMemoryPages = 
getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
-
-               /* allocate the memory available to the backchannel */
-               List<MemorySegment> segments = new ArrayList<MemorySegment>();
-               int segmentSize = getMemoryManager().getPageSize();
-               getMemoryManager().allocatePages(this, segments, 
backChannelMemoryPages);
-
-               /* instantiate the backchannel */
-               BlockingBackChannel backChannel = new BlockingBackChannel(new 
SerializedUpdateBuffer(segments, segmentSize,
-                       getIOManager()));
-
-               /* hand the backchannel over to the iteration tail */
-               Broker<BlockingBackChannel> broker = 
BlockingBackChannelBroker.instance();
-               broker.handIn(brokerKey(), backChannel);
-
-               return backChannel;
-       }
-       
-       private <BT> CompactingHashTable<BT> initCompactingHashTable() throws 
Exception {
-               // get some memory
-               double hashjoinMemorySize = 
config.getRelativeSolutionSetMemory();
-               final ClassLoader userCodeClassLoader = 
getUserCodeClassLoader();
-
-               TypeSerializerFactory<BT> solutionTypeSerializerFactory = 
config.getSolutionSetSerializer(userCodeClassLoader);
-               TypeComparatorFactory<BT> solutionTypeComparatorFactory = 
config.getSolutionSetComparator(userCodeClassLoader);
-       
-               TypeSerializer<BT> solutionTypeSerializer = 
solutionTypeSerializerFactory.getSerializer();
-               TypeComparator<BT> solutionTypeComparator = 
solutionTypeComparatorFactory.createComparator();
-
-               CompactingHashTable<BT> hashTable = null;
-               List<MemorySegment> memSegments = null;
-               boolean success = false;
-               try {
-                       int numPages = 
getMemoryManager().computeNumberOfPages(hashjoinMemorySize);
-                       memSegments = 
getMemoryManager().allocatePages(getOwningNepheleTask(), numPages);
-                       hashTable = new 
CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, 
memSegments);
-                       success = true;
-                       return hashTable;
-               } finally {
-                       if (!success) {
-                               if (hashTable != null) {
-                                       try {
-                                               hashTable.close();
-                                       } catch (Throwable t) {
-                                               log.error("Error closing the 
solution set hash table after unsuccessful creation.", t);
-                                       }
-                               }
-                               if (memSegments != null) {
-                                       try {
-                                               
getMemoryManager().release(memSegments);
-                                       } catch (Throwable t) {
-                                               log.error("Error freeing memory 
after error during solution set hash table creation.", t);
-                                       }
-                               }
-                       }
-               }
-       }
-       
-       private <BT> JoinHashMap<BT> initJoinHashMap() {
-               TypeSerializerFactory<BT> solutionTypeSerializerFactory = 
config.getSolutionSetSerializer
-                               (getUserCodeClassLoader());
-               TypeComparatorFactory<BT> solutionTypeComparatorFactory = 
config.getSolutionSetComparator
-                               (getUserCodeClassLoader());
-       
-               TypeSerializer<BT> solutionTypeSerializer = 
solutionTypeSerializerFactory.getSerializer();
-               TypeComparator<BT> solutionTypeComparator = 
solutionTypeComparatorFactory.createComparator();
-               
-               return new JoinHashMap<BT>(solutionTypeSerializer, 
solutionTypeComparator);
-       }
-       
-       private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, 
MutableObjectIterator<X> solutionSetInput) throws IOException {
-               solutionSet.open();
-               solutionSet.buildTableWithUniqueKey(solutionSetInput);
-       }
-       
-       private void readInitialSolutionSet(JoinHashMap<X> solutionSet, 
MutableObjectIterator<X> solutionSetInput) throws IOException {
-               TypeSerializer<X> serializer = 
solutionTypeSerializer.getSerializer();
-               
-               X next;
-               while ((next = 
solutionSetInput.next(serializer.createInstance())) != null) {
-                       solutionSet.insertOrReplace(next);
-               }
-       }
-
-       private SuperstepBarrier initSuperstepBarrier() {
-               SuperstepBarrier barrier = new 
SuperstepBarrier(getUserCodeClassLoader());
-               this.toSync.subscribeToEvent(barrier, 
AllWorkersDoneEvent.class);
-               this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
-               return barrier;
-       }
-
-       @Override
-       public void run() throws Exception {
-               final String brokerKey = brokerKey();
-               final int workerIndex = 
getEnvironment().getIndexInSubtaskGroup();
-               
-               final boolean objectSolutionSet = 
config.isSolutionSetUnmanaged();
-
-               CompactingHashTable<X> solutionSet = null; // if workset 
iteration
-               JoinHashMap<X> solutionSetObjectMap = null; // if workset 
iteration with unmanaged solution set
-               
-               boolean waitForSolutionSetUpdate = 
config.getWaitForSolutionSetUpdate();
-               boolean isWorksetIteration = config.getIsWorksetIteration();
-
-               try {
-                       /* used for receiving the current iteration result from 
iteration tail */
-                       SuperstepKickoffLatch nextStepKickoff = new 
SuperstepKickoffLatch();
-                       
SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
-                       
-                       BlockingBackChannel backChannel = initBackChannel();
-                       SuperstepBarrier barrier = initSuperstepBarrier();
-                       SolutionSetUpdateBarrier solutionSetUpdateBarrier = 
null;
-
-                       feedbackDataInput = 
config.getIterationHeadPartialSolutionOrWorksetInputIndex();
-                       feedbackTypeSerializer = 
this.getInputSerializer(feedbackDataInput);
-                       excludeFromReset(feedbackDataInput);
-
-                       int initialSolutionSetInput;
-                       if (isWorksetIteration) {
-                               initialSolutionSetInput = 
config.getIterationHeadSolutionSetInputIndex();
-                               solutionTypeSerializer = 
config.getSolutionSetSerializer(getUserCodeClassLoader());
-
-                               // setup the index for the solution set
-                               @SuppressWarnings("unchecked")
-                               MutableObjectIterator<X> solutionSetInput = 
(MutableObjectIterator<X>) 
createInputIterator(inputReaders[initialSolutionSetInput], 
solutionTypeSerializer);
-                               
-                               // read the initial solution set
-                               if (objectSolutionSet) {
-                                       solutionSetObjectMap = 
initJoinHashMap();
-                                       
readInitialSolutionSet(solutionSetObjectMap, solutionSetInput);
-                                       
SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap);
-                               } else {
-                                       solutionSet = initCompactingHashTable();
-                                       readInitialSolutionSet(solutionSet, 
solutionSetInput);
-                                       
SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
-                               }
-
-                               if (waitForSolutionSetUpdate) {
-                                       solutionSetUpdateBarrier = new 
SolutionSetUpdateBarrier();
-                                       
SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, 
solutionSetUpdateBarrier);
-                               }
-                       }
-                       else {
-                               // bulk iteration case
-                               @SuppressWarnings("unchecked")
-                               TypeSerializerFactory<X> solSer = 
(TypeSerializerFactory<X>) feedbackTypeSerializer;
-                               solutionTypeSerializer = solSer;
-                               
-                               // = termination Criterion tail
-                               if (waitForSolutionSetUpdate) {
-                                       solutionSetUpdateBarrier = new 
SolutionSetUpdateBarrier();
-                                       
SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, 
solutionSetUpdateBarrier);
-                               }
-                       }
-
-                       // instantiate all aggregators and register them at the 
iteration global registry
-                       RuntimeAggregatorRegistry aggregatorRegistry = new 
RuntimeAggregatorRegistry(config.getIterationAggregators
-                                       (getUserCodeClassLoader()));
-                       IterationAggregatorBroker.instance().handIn(brokerKey, 
aggregatorRegistry);
-
-                       DataInputView superstepResult = null;
-
-                       while (this.running && !terminationRequested()) {
-
-                               if (log.isInfoEnabled()) {
-                                       log.info(formatLogString("starting 
iteration [" + currentIteration() + "]"));
-                               }
-
-                               barrier.setup();
-
-                               if (waitForSolutionSetUpdate) {
-                                       solutionSetUpdateBarrier.setup();
-                               }
-
-                               if (!inFirstIteration()) {
-                                       
feedBackSuperstepResult(superstepResult);
-                               }
-
-                               super.run();
-
-                               // signal to connected tasks that we are done 
with the superstep
-                               sendEndOfSuperstepToAllIterationOutputs();
-
-                               if (waitForSolutionSetUpdate) {
-                                       
solutionSetUpdateBarrier.waitForSolutionSetUpdate();
-                               }
-
-                               // blocking call to wait for the result
-                               superstepResult = 
backChannel.getReadEndAfterSuperstepEnded();
-                               if (log.isInfoEnabled()) {
-                                       log.info(formatLogString("finishing 
iteration [" + currentIteration() + "]"));
-                               }
-
-                               sendEventToSync(new 
WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators()));
-
-                               if (log.isInfoEnabled()) {
-                                       log.info(formatLogString("waiting for 
other workers in iteration [" + currentIteration() + "]"));
-                               }
-
-                               barrier.waitForOtherWorkers();
-
-                               if (barrier.terminationSignaled()) {
-                                       if (log.isInfoEnabled()) {
-                                               log.info(formatLogString("head 
received termination request in iteration ["
-                                                       + currentIteration()
-                                                       + "]"));
-                                       }
-                                       requestTermination();
-                                       nextStepKickoff.signalTermination();
-                               } else {
-                                       incrementIterationCounter();
-
-                                       String[] globalAggregateNames = 
barrier.getAggregatorNames();
-                                       Value[] globalAggregates = 
barrier.getAggregates();
-                                       
aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, 
globalAggregates);
-                                       
-                                       nextStepKickoff.triggerNextSuperstep();
-                               }
-                       }
-
-                       if (log.isInfoEnabled()) {
-                               log.info(formatLogString("streaming out final 
result after [" + currentIteration() + "] iterations"));
-                       }
-
-                       if (isWorksetIteration) {
-                               if (objectSolutionSet) {
-                                       
streamSolutionSetToFinalOutput(solutionSetObjectMap);
-                               } else {
-                                       
streamSolutionSetToFinalOutput(solutionSet);
-                               }
-                       } else {
-                               streamOutFinalOutputBulk(new 
InputViewIterator<X>(superstepResult, 
this.solutionTypeSerializer.getSerializer()));
-                       }
-
-                       this.finalOutputCollector.close();
-
-               } finally {
-                       // make sure we unregister everything from the broker:
-                       // - backchannel
-                       // - aggregator registry
-                       // - solution set index
-                       IterationAggregatorBroker.instance().remove(brokerKey);
-                       BlockingBackChannelBroker.instance().remove(brokerKey);
-                       
SuperstepKickoffLatchBroker.instance().remove(brokerKey);
-                       SolutionSetBroker.instance().remove(brokerKey);
-                       
SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
-
-                       if (solutionSet != null) {
-                               solutionSet.close();
-                       }
-               }
-       }
-
-       private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) 
throws IOException {
-               final Collector<X> out = this.finalOutputCollector;
-               X record = 
this.solutionTypeSerializer.getSerializer().createInstance();
-
-               while ((record = results.next(record)) != null) {
-                       out.collect(record);
-               }
-       }
-       
-       private void streamSolutionSetToFinalOutput(CompactingHashTable<X> 
hashTable) throws IOException {
-               final MutableObjectIterator<X> results = 
hashTable.getEntryIterator();
-               final Collector<X> output = this.finalOutputCollector;
-               X record = 
solutionTypeSerializer.getSerializer().createInstance();
-
-               while ((record = results.next(record)) != null) {
-                       output.collect(record);
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) 
throws IOException {
-               final Collector<X> output = this.finalOutputCollector;
-               for (Object e : soluionSet.values()) {
-                       output.collect((X) e);
-               }
-       }
-
-       private void feedBackSuperstepResult(DataInputView superstepResult) {
-               this.inputs[this.feedbackDataInput] =
-                       new InputViewIterator<Y>(superstepResult, 
this.feedbackTypeSerializer.getSerializer());
-       }
-
-       private void sendEndOfSuperstepToAllIterationOutputs() throws 
IOException, InterruptedException {
-               if (log.isDebugEnabled()) {
-                       log.debug(formatLogString("Sending end-of-superstep to 
all iteration outputs."));
-               }
-
-               for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
-                       eventualOutput.sendEndOfSuperstep();
-               }
-       }
-
-       private void sendEventToSync(WorkerDoneEvent event) throws IOException, 
InterruptedException {
-               if (log.isInfoEnabled()) {
-                       log.info(formatLogString("sending " + 
WorkerDoneEvent.class.getSimpleName() + " to sync"));
-               }
-               this.toSync.writeEventToAllChannels(event);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
new file mode 100644
index 0000000..c6268f4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java
@@ -0,0 +1,441 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.operators.Driver;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
+import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
+import 
org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepBarrier;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import 
org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
+import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
+import org.apache.flink.runtime.iterative.io.SerializedUpdateBuffer;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.hash.CompactingHashTable;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+/**
+ * The head is responsible for coordinating an iteration and can run a
+ * {@link Driver} inside. It will read
+ * the initial input and establish a {@link BlockingBackChannel} to the 
iteration's tail. After successfully processing
+ * the input, it will send EndOfSuperstep events to its outputs. It must also 
be connected to a
+ * synchronization task and after each superstep, it will wait
+ * until it receives an {@link AllWorkersDoneEvent} from the sync, which 
signals that all other heads have also finished
+ * their iteration. Starting with
+ * the second iteration, the input for the head is the output of the tail, 
transmitted through the backchannel. Once the
+ * iteration is done, the head
+ * will send a {@link TerminationEvent} to all it's connected tasks, signaling 
them to shutdown.
+ * <p>
+ * Assumption on the ordering of the outputs: - The first n output gates write 
to channels that go to the tasks of the
+ * step function. - The next m output gates to to the tasks that consume the 
final solution. - The last output gate
+ * connects to the synchronization task.
+ * 
+ * @param <X>
+ *        The type of the bulk partial solution / solution set and the final 
output.
+ * @param <Y>
+ *        The type of the feed-back data set (bulk partial solution / 
workset). For bulk iterations, {@code Y} is the
+ *        same as {@code X}
+ */
+public class IterationHeadTask<X, Y, S extends Function, OT> extends 
AbstractIterativeTask<S, OT> {
+
+       private static final Logger log = 
LoggerFactory.getLogger(IterationHeadTask.class);
+
+       private Collector<X> finalOutputCollector;
+
+       private TypeSerializerFactory<Y> feedbackTypeSerializer;
+
+       private TypeSerializerFactory<X> solutionTypeSerializer;
+
+       private ResultPartitionWriter toSync;
+
+       private int feedbackDataInput; // workset or bulk partial solution
+
+       // 
--------------------------------------------------------------------------------------------
+
+       @Override
+       protected int getNumTaskInputs() {
+               // this task has an additional input in the workset case for 
the initial solution set
+               boolean isWorkset = config.getIsWorksetIteration();
+               return driver.getNumberOfInputs() + (isWorkset ? 1 : 0);
+       }
+
+       @Override
+       protected void initOutputs() throws Exception {
+               // initialize the regular outputs first (the ones into the step 
function).
+               super.initOutputs();
+
+               // at this time, the outputs to the step function are created
+               // add the outputs for the final solution
+               List<RecordWriter<?>> finalOutputWriters = new 
ArrayList<RecordWriter<?>>();
+               final TaskConfig finalOutConfig = 
this.config.getIterationHeadFinalOutputConfig();
+               final ClassLoader userCodeClassLoader = 
getUserCodeClassLoader();
+               AccumulatorRegistry.Reporter reporter = 
getEnvironment().getAccumulatorRegistry().getReadWriteReporter();
+               this.finalOutputCollector = BatchTask.getOutputCollector(this, 
finalOutConfig,
+                               userCodeClassLoader, finalOutputWriters, 
config.getNumOutputs(), finalOutConfig.getNumOutputs(), reporter);
+
+               // sanity check the setup
+               final int writersIntoStepFunction = this.eventualOutputs.size();
+               final int writersIntoFinalResult = finalOutputWriters.size();
+               final int syncGateIndex = 
this.config.getIterationHeadIndexOfSyncOutput();
+
+               if (writersIntoStepFunction + writersIntoFinalResult != 
syncGateIndex) {
+                       throw new Exception("Error: Inconsistent head task 
setup - wrong mapping of output gates.");
+               }
+               // now, we can instantiate the sync gate
+               this.toSync = getEnvironment().getWriter(syncGateIndex);
+       }
+
+       /**
+        * the iteration head prepares the backchannel: it allocates memory, 
instantiates a {@link BlockingBackChannel} and
+        * hands it to the iteration tail via a {@link Broker} singleton
+        **/
+       private BlockingBackChannel initBackChannel() throws Exception {
+
+               /* get the size of the memory available to the backchannel */
+               int backChannelMemoryPages = 
getMemoryManager().computeNumberOfPages(this.config.getRelativeBackChannelMemory());
+
+               /* allocate the memory available to the backchannel */
+               List<MemorySegment> segments = new ArrayList<MemorySegment>();
+               int segmentSize = getMemoryManager().getPageSize();
+               getMemoryManager().allocatePages(this, segments, 
backChannelMemoryPages);
+
+               /* instantiate the backchannel */
+               BlockingBackChannel backChannel = new BlockingBackChannel(new 
SerializedUpdateBuffer(segments, segmentSize,
+                       getIOManager()));
+
+               /* hand the backchannel over to the iteration tail */
+               Broker<BlockingBackChannel> broker = 
BlockingBackChannelBroker.instance();
+               broker.handIn(brokerKey(), backChannel);
+
+               return backChannel;
+       }
+       
+       private <BT> CompactingHashTable<BT> initCompactingHashTable() throws 
Exception {
+               // get some memory
+               double hashjoinMemorySize = 
config.getRelativeSolutionSetMemory();
+               final ClassLoader userCodeClassLoader = 
getUserCodeClassLoader();
+
+               TypeSerializerFactory<BT> solutionTypeSerializerFactory = 
config.getSolutionSetSerializer(userCodeClassLoader);
+               TypeComparatorFactory<BT> solutionTypeComparatorFactory = 
config.getSolutionSetComparator(userCodeClassLoader);
+       
+               TypeSerializer<BT> solutionTypeSerializer = 
solutionTypeSerializerFactory.getSerializer();
+               TypeComparator<BT> solutionTypeComparator = 
solutionTypeComparatorFactory.createComparator();
+
+               CompactingHashTable<BT> hashTable = null;
+               List<MemorySegment> memSegments = null;
+               boolean success = false;
+               try {
+                       int numPages = 
getMemoryManager().computeNumberOfPages(hashjoinMemorySize);
+                       memSegments = 
getMemoryManager().allocatePages(getOwningNepheleTask(), numPages);
+                       hashTable = new 
CompactingHashTable<BT>(solutionTypeSerializer, solutionTypeComparator, 
memSegments);
+                       success = true;
+                       return hashTable;
+               } finally {
+                       if (!success) {
+                               if (hashTable != null) {
+                                       try {
+                                               hashTable.close();
+                                       } catch (Throwable t) {
+                                               log.error("Error closing the 
solution set hash table after unsuccessful creation.", t);
+                                       }
+                               }
+                               if (memSegments != null) {
+                                       try {
+                                               
getMemoryManager().release(memSegments);
+                                       } catch (Throwable t) {
+                                               log.error("Error freeing memory 
after error during solution set hash table creation.", t);
+                                       }
+                               }
+                       }
+               }
+       }
+       
+       private <BT> JoinHashMap<BT> initJoinHashMap() {
+               TypeSerializerFactory<BT> solutionTypeSerializerFactory = 
config.getSolutionSetSerializer
+                               (getUserCodeClassLoader());
+               TypeComparatorFactory<BT> solutionTypeComparatorFactory = 
config.getSolutionSetComparator
+                               (getUserCodeClassLoader());
+       
+               TypeSerializer<BT> solutionTypeSerializer = 
solutionTypeSerializerFactory.getSerializer();
+               TypeComparator<BT> solutionTypeComparator = 
solutionTypeComparatorFactory.createComparator();
+               
+               return new JoinHashMap<BT>(solutionTypeSerializer, 
solutionTypeComparator);
+       }
+       
+       private void readInitialSolutionSet(CompactingHashTable<X> solutionSet, 
MutableObjectIterator<X> solutionSetInput) throws IOException {
+               solutionSet.open();
+               solutionSet.buildTableWithUniqueKey(solutionSetInput);
+       }
+       
+       private void readInitialSolutionSet(JoinHashMap<X> solutionSet, 
MutableObjectIterator<X> solutionSetInput) throws IOException {
+               TypeSerializer<X> serializer = 
solutionTypeSerializer.getSerializer();
+               
+               X next;
+               while ((next = 
solutionSetInput.next(serializer.createInstance())) != null) {
+                       solutionSet.insertOrReplace(next);
+               }
+       }
+
+       private SuperstepBarrier initSuperstepBarrier() {
+               SuperstepBarrier barrier = new 
SuperstepBarrier(getUserCodeClassLoader());
+               this.toSync.subscribeToEvent(barrier, 
AllWorkersDoneEvent.class);
+               this.toSync.subscribeToEvent(barrier, TerminationEvent.class);
+               return barrier;
+       }
+
+       @Override
+       public void run() throws Exception {
+               final String brokerKey = brokerKey();
+               final int workerIndex = 
getEnvironment().getIndexInSubtaskGroup();
+               
+               final boolean objectSolutionSet = 
config.isSolutionSetUnmanaged();
+
+               CompactingHashTable<X> solutionSet = null; // if workset 
iteration
+               JoinHashMap<X> solutionSetObjectMap = null; // if workset 
iteration with unmanaged solution set
+               
+               boolean waitForSolutionSetUpdate = 
config.getWaitForSolutionSetUpdate();
+               boolean isWorksetIteration = config.getIsWorksetIteration();
+
+               try {
+                       /* used for receiving the current iteration result from 
iteration tail */
+                       SuperstepKickoffLatch nextStepKickoff = new 
SuperstepKickoffLatch();
+                       
SuperstepKickoffLatchBroker.instance().handIn(brokerKey, nextStepKickoff);
+                       
+                       BlockingBackChannel backChannel = initBackChannel();
+                       SuperstepBarrier barrier = initSuperstepBarrier();
+                       SolutionSetUpdateBarrier solutionSetUpdateBarrier = 
null;
+
+                       feedbackDataInput = 
config.getIterationHeadPartialSolutionOrWorksetInputIndex();
+                       feedbackTypeSerializer = 
this.getInputSerializer(feedbackDataInput);
+                       excludeFromReset(feedbackDataInput);
+
+                       int initialSolutionSetInput;
+                       if (isWorksetIteration) {
+                               initialSolutionSetInput = 
config.getIterationHeadSolutionSetInputIndex();
+                               solutionTypeSerializer = 
config.getSolutionSetSerializer(getUserCodeClassLoader());
+
+                               // setup the index for the solution set
+                               @SuppressWarnings("unchecked")
+                               MutableObjectIterator<X> solutionSetInput = 
(MutableObjectIterator<X>) 
createInputIterator(inputReaders[initialSolutionSetInput], 
solutionTypeSerializer);
+                               
+                               // read the initial solution set
+                               if (objectSolutionSet) {
+                                       solutionSetObjectMap = 
initJoinHashMap();
+                                       
readInitialSolutionSet(solutionSetObjectMap, solutionSetInput);
+                                       
SolutionSetBroker.instance().handIn(brokerKey, solutionSetObjectMap);
+                               } else {
+                                       solutionSet = initCompactingHashTable();
+                                       readInitialSolutionSet(solutionSet, 
solutionSetInput);
+                                       
SolutionSetBroker.instance().handIn(brokerKey, solutionSet);
+                               }
+
+                               if (waitForSolutionSetUpdate) {
+                                       solutionSetUpdateBarrier = new 
SolutionSetUpdateBarrier();
+                                       
SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, 
solutionSetUpdateBarrier);
+                               }
+                       }
+                       else {
+                               // bulk iteration case
+                               @SuppressWarnings("unchecked")
+                               TypeSerializerFactory<X> solSer = 
(TypeSerializerFactory<X>) feedbackTypeSerializer;
+                               solutionTypeSerializer = solSer;
+                               
+                               // = termination Criterion tail
+                               if (waitForSolutionSetUpdate) {
+                                       solutionSetUpdateBarrier = new 
SolutionSetUpdateBarrier();
+                                       
SolutionSetUpdateBarrierBroker.instance().handIn(brokerKey, 
solutionSetUpdateBarrier);
+                               }
+                       }
+
+                       // instantiate all aggregators and register them at the 
iteration global registry
+                       RuntimeAggregatorRegistry aggregatorRegistry = new 
RuntimeAggregatorRegistry(config.getIterationAggregators
+                                       (getUserCodeClassLoader()));
+                       IterationAggregatorBroker.instance().handIn(brokerKey, 
aggregatorRegistry);
+
+                       DataInputView superstepResult = null;
+
+                       while (this.running && !terminationRequested()) {
+
+                               if (log.isInfoEnabled()) {
+                                       log.info(formatLogString("starting 
iteration [" + currentIteration() + "]"));
+                               }
+
+                               barrier.setup();
+
+                               if (waitForSolutionSetUpdate) {
+                                       solutionSetUpdateBarrier.setup();
+                               }
+
+                               if (!inFirstIteration()) {
+                                       
feedBackSuperstepResult(superstepResult);
+                               }
+
+                               super.run();
+
+                               // signal to connected tasks that we are done 
with the superstep
+                               sendEndOfSuperstepToAllIterationOutputs();
+
+                               if (waitForSolutionSetUpdate) {
+                                       
solutionSetUpdateBarrier.waitForSolutionSetUpdate();
+                               }
+
+                               // blocking call to wait for the result
+                               superstepResult = 
backChannel.getReadEndAfterSuperstepEnded();
+                               if (log.isInfoEnabled()) {
+                                       log.info(formatLogString("finishing 
iteration [" + currentIteration() + "]"));
+                               }
+
+                               sendEventToSync(new 
WorkerDoneEvent(workerIndex, aggregatorRegistry.getAllAggregators()));
+
+                               if (log.isInfoEnabled()) {
+                                       log.info(formatLogString("waiting for 
other workers in iteration [" + currentIteration() + "]"));
+                               }
+
+                               barrier.waitForOtherWorkers();
+
+                               if (barrier.terminationSignaled()) {
+                                       if (log.isInfoEnabled()) {
+                                               log.info(formatLogString("head 
received termination request in iteration ["
+                                                       + currentIteration()
+                                                       + "]"));
+                                       }
+                                       requestTermination();
+                                       nextStepKickoff.signalTermination();
+                               } else {
+                                       incrementIterationCounter();
+
+                                       String[] globalAggregateNames = 
barrier.getAggregatorNames();
+                                       Value[] globalAggregates = 
barrier.getAggregates();
+                                       
aggregatorRegistry.updateGlobalAggregatesAndReset(globalAggregateNames, 
globalAggregates);
+                                       
+                                       nextStepKickoff.triggerNextSuperstep();
+                               }
+                       }
+
+                       if (log.isInfoEnabled()) {
+                               log.info(formatLogString("streaming out final 
result after [" + currentIteration() + "] iterations"));
+                       }
+
+                       if (isWorksetIteration) {
+                               if (objectSolutionSet) {
+                                       
streamSolutionSetToFinalOutput(solutionSetObjectMap);
+                               } else {
+                                       
streamSolutionSetToFinalOutput(solutionSet);
+                               }
+                       } else {
+                               streamOutFinalOutputBulk(new 
InputViewIterator<X>(superstepResult, 
this.solutionTypeSerializer.getSerializer()));
+                       }
+
+                       this.finalOutputCollector.close();
+
+               } finally {
+                       // make sure we unregister everything from the broker:
+                       // - backchannel
+                       // - aggregator registry
+                       // - solution set index
+                       IterationAggregatorBroker.instance().remove(brokerKey);
+                       BlockingBackChannelBroker.instance().remove(brokerKey);
+                       
SuperstepKickoffLatchBroker.instance().remove(brokerKey);
+                       SolutionSetBroker.instance().remove(brokerKey);
+                       
SolutionSetUpdateBarrierBroker.instance().remove(brokerKey);
+
+                       if (solutionSet != null) {
+                               solutionSet.close();
+                       }
+               }
+       }
+
+       private void streamOutFinalOutputBulk(MutableObjectIterator<X> results) 
throws IOException {
+               final Collector<X> out = this.finalOutputCollector;
+               X record = 
this.solutionTypeSerializer.getSerializer().createInstance();
+
+               while ((record = results.next(record)) != null) {
+                       out.collect(record);
+               }
+       }
+       
+       private void streamSolutionSetToFinalOutput(CompactingHashTable<X> 
hashTable) throws IOException {
+               final MutableObjectIterator<X> results = 
hashTable.getEntryIterator();
+               final Collector<X> output = this.finalOutputCollector;
+               X record = 
solutionTypeSerializer.getSerializer().createInstance();
+
+               while ((record = results.next(record)) != null) {
+                       output.collect(record);
+               }
+       }
+       
+       @SuppressWarnings("unchecked")
+       private void streamSolutionSetToFinalOutput(JoinHashMap<X> soluionSet) 
throws IOException {
+               final Collector<X> output = this.finalOutputCollector;
+               for (Object e : soluionSet.values()) {
+                       output.collect((X) e);
+               }
+       }
+
+       private void feedBackSuperstepResult(DataInputView superstepResult) {
+               this.inputs[this.feedbackDataInput] =
+                       new InputViewIterator<Y>(superstepResult, 
this.feedbackTypeSerializer.getSerializer());
+       }
+
+       private void sendEndOfSuperstepToAllIterationOutputs() throws 
IOException, InterruptedException {
+               if (log.isDebugEnabled()) {
+                       log.debug(formatLogString("Sending end-of-superstep to 
all iteration outputs."));
+               }
+
+               for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
+                       eventualOutput.sendEndOfSuperstep();
+               }
+       }
+
+       private void sendEventToSync(WorkerDoneEvent event) throws IOException, 
InterruptedException {
+               if (log.isInfoEnabled()) {
+                       log.info(formatLogString("sending " + 
WorkerDoneEvent.class.getSimpleName() + " to sync"));
+               }
+               this.toSync.writeEventToAllChannels(event);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
deleted file mode 100644
index e7801e4..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import 
org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.event.TerminationEvent;
-import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * An intermediate iteration task, which runs a Driver}inside.
- * <p>
- * It will propagate {@link EndOfSuperstepEvent}s and {@link 
TerminationEvent}s to it's connected tasks. Furthermore
- * intermediate tasks can also update the iteration state, either the workset 
or the solution set.
- * <p>
- * If the iteration state is updated, the output of this task will be send 
back to the {@link IterationHeadPactTask} via
- * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the 
solution set. In this case
- * this task must be scheduled on the same instance as the head.
- */
-public class IterationIntermediatePactTask<S extends Function, OT> extends 
AbstractIterativePactTask<S, OT> {
-
-       private static final Logger log = 
LoggerFactory.getLogger(IterationIntermediatePactTask.class);
-
-       private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-
-       @Override
-       protected void initialize() throws Exception {
-               super.initialize();
-
-               // set the last output collector of this task to reflect the 
iteration intermediate state update
-               // a) workset update
-               // b) solution set update
-               // c) none
-
-               Collector<OT> delegate = getLastOutputCollector();
-               if (isWorksetUpdate) {
-                       // sanity check: we should not have a solution set and 
workset update at the same time
-                       // in an intermediate task
-                       if (isSolutionSetUpdate) {
-                               throw new IllegalStateException("Plan bug: 
Intermediate task performs workset and solutions set update.");
-                       }
-                       
-                       Collector<OT> outputCollector = 
createWorksetUpdateOutputCollector(delegate);
-
-                       // we need the WorksetUpdateOutputCollector separately 
to count the collected elements
-                       if (isWorksetIteration) {
-                               worksetUpdateOutputCollector = 
(WorksetUpdateOutputCollector<OT>) outputCollector;
-                       }
-
-                       setLastOutputCollector(outputCollector);
-               } else if (isSolutionSetUpdate) {
-                       
setLastOutputCollector(createSolutionSetUpdateOutputCollector(delegate));
-               }
-       }
-
-       @Override
-       public void run() throws Exception {
-               
-               SuperstepKickoffLatch nextSuperstepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
-
-               while (this.running && !terminationRequested()) {
-
-                       if (log.isInfoEnabled()) {
-                               log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
-                       }
-
-                       super.run();
-
-                       // check if termination was requested
-                       verifyEndOfSuperstepState();
-
-                       if (isWorksetUpdate && isWorksetIteration) {
-                               long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
-                               worksetAggregator.aggregate(numCollected);
-                       }
-                       
-                       if (log.isInfoEnabled()) {
-                               log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
-                       }
-                       
-                       // let the successors know that the end of this 
superstep data is reached
-                       sendEndOfSuperstep();
-                       
-                       if (isWorksetUpdate) {
-                               // notify iteration head if responsible for 
workset update
-                               worksetBackChannel.notifyOfEndOfSuperstep();
-                       }
-                       
-                       boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
-
-                       if (terminated) {
-                               requestTermination();
-                       }
-                       else {
-                               incrementIterationCounter();
-                       }
-               }
-       }
-
-       private void sendEndOfSuperstep() throws IOException, 
InterruptedException {
-               for (RecordWriter eventualOutput : this.eventualOutputs) {
-                       eventualOutput.sendEndOfSuperstep();
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
new file mode 100644
index 0000000..60f0dcf
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediateTask.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import 
org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.event.TerminationEvent;
+import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.util.Collector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * An intermediate iteration task, which runs a Driver}inside.
+ * <p>
+ * It will propagate {@link EndOfSuperstepEvent}s and {@link 
TerminationEvent}s to it's connected tasks. Furthermore
+ * intermediate tasks can also update the iteration state, either the workset 
or the solution set.
+ * <p>
+ * If the iteration state is updated, the output of this task will be send 
back to the {@link IterationHeadTask} via
+ * a {@link BlockingBackChannel} for the workset -XOR- a eHashTable for the 
solution set. In this case
+ * this task must be scheduled on the same instance as the head.
+ */
+public class IterationIntermediateTask<S extends Function, OT> extends 
AbstractIterativeTask<S, OT> {
+
+       private static final Logger log = 
LoggerFactory.getLogger(IterationIntermediateTask.class);
+
+       private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
+
+       @Override
+       protected void initialize() throws Exception {
+               super.initialize();
+
+               // set the last output collector of this task to reflect the 
iteration intermediate state update
+               // a) workset update
+               // b) solution set update
+               // c) none
+
+               Collector<OT> delegate = getLastOutputCollector();
+               if (isWorksetUpdate) {
+                       // sanity check: we should not have a solution set and 
workset update at the same time
+                       // in an intermediate task
+                       if (isSolutionSetUpdate) {
+                               throw new IllegalStateException("Plan bug: 
Intermediate task performs workset and solutions set update.");
+                       }
+                       
+                       Collector<OT> outputCollector = 
createWorksetUpdateOutputCollector(delegate);
+
+                       // we need the WorksetUpdateOutputCollector separately 
to count the collected elements
+                       if (isWorksetIteration) {
+                               worksetUpdateOutputCollector = 
(WorksetUpdateOutputCollector<OT>) outputCollector;
+                       }
+
+                       setLastOutputCollector(outputCollector);
+               } else if (isSolutionSetUpdate) {
+                       
setLastOutputCollector(createSolutionSetUpdateOutputCollector(delegate));
+               }
+       }
+
+       @Override
+       public void run() throws Exception {
+               
+               SuperstepKickoffLatch nextSuperstepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
+
+               while (this.running && !terminationRequested()) {
+
+                       if (log.isInfoEnabled()) {
+                               log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
+                       }
+
+                       super.run();
+
+                       // check if termination was requested
+                       verifyEndOfSuperstepState();
+
+                       if (isWorksetUpdate && isWorksetIteration) {
+                               long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
+                               worksetAggregator.aggregate(numCollected);
+                       }
+                       
+                       if (log.isInfoEnabled()) {
+                               log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
+                       }
+                       
+                       // let the successors know that the end of this 
superstep data is reached
+                       sendEndOfSuperstep();
+                       
+                       if (isWorksetUpdate) {
+                               // notify iteration head if responsible for 
workset update
+                               worksetBackChannel.notifyOfEndOfSuperstep();
+                       }
+                       
+                       boolean terminated = 
nextSuperstepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+
+                       if (terminated) {
+                               requestTermination();
+                       }
+                       else {
+                               incrementIterationCounter();
+                       }
+               }
+       }
+
+       private void sendEndOfSuperstep() throws IOException, 
InterruptedException {
+               for (RecordWriter eventualOutput : this.eventualOutputs) {
+                       eventualOutput.sendEndOfSuperstep();
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index fed0a17..a85e662 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.types.IntValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,7 +37,6 @@ import 
org.apache.flink.runtime.iterative.event.AllWorkersDoneEvent;
 import org.apache.flink.runtime.iterative.event.TerminationEvent;
 import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.types.Value;
 
@@ -204,7 +204,7 @@ public class IterationSynchronizationSinkTask extends 
AbstractInvokable implemen
        }
 
        private String formatLogString(String message) {
-               return RegularPactTask.constructLogString(message, 
getEnvironment().getTaskName(), this);
+               return BatchTask.constructLogString(message, 
getEnvironment().getTaskName(), this);
        }
        
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
deleted file mode 100644
index 159d3f2..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
-import 
org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
-import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
-import 
org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
-import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.util.Collector;
-
-/**
- * An iteration tail, which runs a driver inside.
- * <p>
- * If the iteration state is updated, the output of this task will be send 
back to the {@link IterationHeadPactTask} via
- * a BackChannel for the workset -OR- a HashTable for the solution set. 
Therefore this
- * task must be scheduled on the same instance as the head. It's also possible 
for the tail to update *both* the workset
- * and the solution set.
- * <p>
- * If there is a separate solution set tail, the iteration head has to make 
sure to wait for it to finish.
- */
-public class IterationTailPactTask<S extends Function, OT> extends 
AbstractIterativePactTask<S, OT> {
-
-       private static final Logger log = 
LoggerFactory.getLogger(IterationTailPactTask.class);
-
-       private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
-
-       private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
-       
-
-       @Override
-       protected void initialize() throws Exception {
-               super.initialize();
-
-               // sanity check: the tail has to update either the workset or 
the solution set
-               if (!isWorksetUpdate && !isSolutionSetUpdate) {
-                       throw new RuntimeException("The iteration tail doesn't 
update workset or the solution set.");
-               }
-
-               // set the last output collector of this task to reflect the 
iteration tail state update:
-               // a) workset update,
-               // b) solution set update, or
-               // c) merged workset and solution set update
-
-               Collector<OT> outputCollector = null;
-               if (isWorksetUpdate) {
-                       outputCollector = createWorksetUpdateOutputCollector();
-
-                       // we need the WorksetUpdateOutputCollector separately 
to count the collected elements
-                       if (isWorksetIteration) {
-                               worksetUpdateOutputCollector = 
(WorksetUpdateOutputCollector<OT>) outputCollector;
-                       }
-               }
-
-               if (isSolutionSetUpdate) {
-                       if (isWorksetIteration) {
-                               outputCollector = 
createSolutionSetUpdateOutputCollector(outputCollector);
-                       }
-                       // Bulk iteration with termination criterion
-                       else {
-                               outputCollector = new Collector<OT>() {
-                                       @Override
-                                       public void collect(OT record) {}
-                                       @Override
-                                       public void close() {}
-                               };
-                       }
-
-                       if (!isWorksetUpdate) {
-                               solutionSetUpdateBarrier = 
SolutionSetUpdateBarrierBroker.instance().get(brokerKey());
-                       }
-               }
-
-               setLastOutputCollector(outputCollector);
-       }
-
-       @Override
-       public void run() throws Exception {
-               
-               SuperstepKickoffLatch nextSuperStepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
-               
-               while (this.running && !terminationRequested()) {
-
-                       if (log.isInfoEnabled()) {
-                               log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
-                       }
-
-                       super.run();
-
-                       // check if termination was requested
-                       verifyEndOfSuperstepState();
-
-                       if (isWorksetUpdate && isWorksetIteration) {
-                               // aggregate workset update element count
-                               long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
-                               worksetAggregator.aggregate(numCollected);
-
-                       }
-
-                       if (log.isInfoEnabled()) {
-                               log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
-                       }
-                       
-                       if (isWorksetUpdate) {
-                               // notify iteration head if responsible for 
workset update
-                               worksetBackChannel.notifyOfEndOfSuperstep();
-                       } else if (isSolutionSetUpdate) {
-                               // notify iteration head if responsible for 
solution set update
-                               
solutionSetUpdateBarrier.notifySolutionSetUpdate();
-                       }
-
-                       boolean terminate = 
nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
-                       if (terminate) {
-                               requestTermination();
-                       }
-                       else {
-                               incrementIterationCounter();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
new file mode 100644
index 0000000..9e0b560
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailTask.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
+import 
org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
+import org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatch;
+import 
org.apache.flink.runtime.iterative.concurrent.SuperstepKickoffLatchBroker;
+import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.util.Collector;
+
+/**
+ * An iteration tail, which runs a driver inside.
+ * <p>
+ * If the iteration state is updated, the output of this task will be send 
back to the {@link IterationHeadTask} via
+ * a BackChannel for the workset -OR- a HashTable for the solution set. 
Therefore this
+ * task must be scheduled on the same instance as the head. It's also possible 
for the tail to update *both* the workset
+ * and the solution set.
+ * <p>
+ * If there is a separate solution set tail, the iteration head has to make 
sure to wait for it to finish.
+ */
+public class IterationTailTask<S extends Function, OT> extends 
AbstractIterativeTask<S, OT> {
+
+       private static final Logger log = 
LoggerFactory.getLogger(IterationTailTask.class);
+
+       private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
+
+       private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
+       
+
+       @Override
+       protected void initialize() throws Exception {
+               super.initialize();
+
+               // sanity check: the tail has to update either the workset or 
the solution set
+               if (!isWorksetUpdate && !isSolutionSetUpdate) {
+                       throw new RuntimeException("The iteration tail doesn't 
update workset or the solution set.");
+               }
+
+               // set the last output collector of this task to reflect the 
iteration tail state update:
+               // a) workset update,
+               // b) solution set update, or
+               // c) merged workset and solution set update
+
+               Collector<OT> outputCollector = null;
+               if (isWorksetUpdate) {
+                       outputCollector = createWorksetUpdateOutputCollector();
+
+                       // we need the WorksetUpdateOutputCollector separately 
to count the collected elements
+                       if (isWorksetIteration) {
+                               worksetUpdateOutputCollector = 
(WorksetUpdateOutputCollector<OT>) outputCollector;
+                       }
+               }
+
+               if (isSolutionSetUpdate) {
+                       if (isWorksetIteration) {
+                               outputCollector = 
createSolutionSetUpdateOutputCollector(outputCollector);
+                       }
+                       // Bulk iteration with termination criterion
+                       else {
+                               outputCollector = new Collector<OT>() {
+                                       @Override
+                                       public void collect(OT record) {}
+                                       @Override
+                                       public void close() {}
+                               };
+                       }
+
+                       if (!isWorksetUpdate) {
+                               solutionSetUpdateBarrier = 
SolutionSetUpdateBarrierBroker.instance().get(brokerKey());
+                       }
+               }
+
+               setLastOutputCollector(outputCollector);
+       }
+
+       @Override
+       public void run() throws Exception {
+               
+               SuperstepKickoffLatch nextSuperStepLatch = 
SuperstepKickoffLatchBroker.instance().get(brokerKey());
+               
+               while (this.running && !terminationRequested()) {
+
+                       if (log.isInfoEnabled()) {
+                               log.info(formatLogString("starting iteration [" 
+ currentIteration() + "]"));
+                       }
+
+                       super.run();
+
+                       // check if termination was requested
+                       verifyEndOfSuperstepState();
+
+                       if (isWorksetUpdate && isWorksetIteration) {
+                               // aggregate workset update element count
+                               long numCollected = 
worksetUpdateOutputCollector.getElementsCollectedAndReset();
+                               worksetAggregator.aggregate(numCollected);
+
+                       }
+
+                       if (log.isInfoEnabled()) {
+                               log.info(formatLogString("finishing iteration 
[" + currentIteration() + "]"));
+                       }
+                       
+                       if (isWorksetUpdate) {
+                               // notify iteration head if responsible for 
workset update
+                               worksetBackChannel.notifyOfEndOfSuperstep();
+                       } else if (isSolutionSetUpdate) {
+                               // notify iteration head if responsible for 
solution set update
+                               
solutionSetUpdateBarrier.notifySolutionSetUpdate();
+                       }
+
+                       boolean terminate = 
nextSuperStepLatch.awaitStartOfSuperstepOrTermination(currentIteration() + 1);
+                       if (terminate) {
+                               requestTermination();
+                       }
+                       else {
+                               incrementIterationCounter();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
index 0e0bd26..4923b3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/AbstractInvokable.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -28,7 +29,7 @@ import org.slf4j.LoggerFactory;
 /**
  * This is the abstract base class for every task that can be executed by a 
TaskManager.
  * Concrete tasks like the stream vertices of the batch tasks
- * (see {@link org.apache.flink.runtime.operators.RegularPactTask}) inherit 
from this class.
+ * (see {@link BatchTask}) inherit from this class.
  *
  * The TaskManager invokes the methods {@link #registerInputOutput()} and 
{@link #invoke()} in
  * this order when executing a task. The first method is responsible for 
setting up input and

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
index 4096f0c..8f72754 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
 
-public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends 
JoinDriver<IT1, IT2, OT> implements ResettablePactDriver<FlatJoinFunction<IT1, 
IT2, OT>, OT> {
+public abstract class AbstractCachedBuildSideJoinDriver<IT1, IT2, OT> extends 
JoinDriver<IT1, IT2, OT> implements ResettableDriver<FlatJoinFunction<IT1, IT2, 
OT>, OT> {
 
        private volatile JoinTaskIterator<IT1, IT2, OT> matchIterator;
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
index 38c74e0..8c964d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractOuterJoinDriver.java
@@ -38,11 +38,11 @@ import org.slf4j.LoggerFactory;
  *
  * @see FlatJoinFunction
  */
-public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements 
PactDriver<FlatJoinFunction<IT1, IT2, OT>, OT> {
+public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> implements 
Driver<FlatJoinFunction<IT1, IT2, OT>, OT> {
        
        protected static final Logger LOG = 
LoggerFactory.getLogger(AbstractOuterJoinDriver.class);
        
-       protected PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
taskContext;
+       protected TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> taskContext;
        
        protected volatile JoinTaskIterator<IT1, IT2, OT> outerJoinIterator; // 
the iterator that does the actual outer join
        protected volatile boolean running;
@@ -50,7 +50,7 @@ public abstract class AbstractOuterJoinDriver<IT1, IT2, OT> 
implements PactDrive
        // 
------------------------------------------------------------------------
        
        @Override
-       public void setup(PactTaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
+       public void setup(TaskContext<FlatJoinFunction<IT1, IT2, OT>, OT> 
context) {
                this.taskContext = context;
                this.running = true;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
index 7b279ee..0c8dc34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupCombineDriver.java
@@ -37,18 +37,18 @@ import org.slf4j.LoggerFactory;
 *
 * @see GroupCombineFunction
 */
-public class AllGroupCombineDriver<IN, OUT> implements 
PactDriver<GroupCombineFunction<IN, OUT>, OUT> {
+public class AllGroupCombineDriver<IN, OUT> implements 
Driver<GroupCombineFunction<IN, OUT>, OUT> {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(AllGroupCombineDriver.class);
 
-       private PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
+       private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
 
        private boolean objectReuseEnabled = false;
 
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GroupCombineFunction<IN, OUT>, OUT> 
context) {
+       public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> 
context) {
                this.taskContext = context;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index a20fddf..255c57c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -45,11 +45,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.common.functions.GroupReduceFunction
  */
-public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunction<IT, OT>, OT> {
+public class AllGroupReduceDriver<IT, OT> implements 
Driver<GroupReduceFunction<IT, OT>, OT> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(AllGroupReduceDriver.class);
 
-       private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
+       private TaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
        
        private MutableObjectIterator<IT> input;
 
@@ -62,7 +62,7 @@ public class AllGroupReduceDriver<IT, OT> implements 
PactDriver<GroupReduceFunct
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<GroupReduceFunction<IT, OT>, OT> 
context) {
+       public void setup(TaskContext<GroupReduceFunction<IT, OT>, OT> context) 
{
                this.taskContext = context;
        }
        

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
index 1f58c1b..f27ae34 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllReduceDriver.java
@@ -38,11 +38,11 @@ import org.apache.flink.util.MutableObjectIterator;
  * 
  * @see org.apache.flink.api.common.functions.ReduceFunction
  */
-public class AllReduceDriver<T> implements PactDriver<ReduceFunction<T>, T> {
+public class AllReduceDriver<T> implements Driver<ReduceFunction<T>, T> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(AllReduceDriver.class);
 
-       private PactTaskContext<ReduceFunction<T>, T> taskContext;
+       private TaskContext<ReduceFunction<T>, T> taskContext;
        
        private MutableObjectIterator<T> input;
 
@@ -55,7 +55,7 @@ public class AllReduceDriver<T> implements 
PactDriver<ReduceFunction<T>, T> {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void setup(PactTaskContext<ReduceFunction<T>, T> context) {
+       public void setup(TaskContext<ReduceFunction<T>, T> context) {
                this.taskContext = context;
                this.running = true;
        }

Reply via email to