http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
new file mode 100644
index 0000000..7a529b9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The task event dispatcher dispatches events flowing backwards from a 
consumer
+ * to a producer. It only supports programs, where the producer and consumer
+ * are running at the same time.
+ * <p>
+ * The publish method is either called from the local input channel or the
+ * network I/O thread.
+ */
+public class TaskEventDispatcher {
+
+       Table<ExecutionAttemptID, IntermediateResultPartitionID, BufferWriter> 
registeredWriters = HashBasedTable.create();
+
+       public void registerWriterForIncomingTaskEvents(ExecutionAttemptID 
executionId, IntermediateResultPartitionID partitionId, BufferWriter listener) {
+               synchronized (registeredWriters) {
+                       if (registeredWriters.put(executionId, partitionId, 
listener) != null) {
+                               throw new IllegalStateException("Event 
dispatcher already contains buffer writer.");
+                       }
+               }
+       }
+
+       public void unregisterWriters(ExecutionAttemptID executionId) {
+               synchronized (registeredWriters) {
+                       List<IntermediateResultPartitionID> writersToUnregister 
= new ArrayList<IntermediateResultPartitionID>();
+
+                       for (IntermediateResultPartitionID partitionId : 
registeredWriters.row(executionId).keySet()) {
+                               writersToUnregister.add(partitionId);
+                       }
+
+                       for(IntermediateResultPartitionID partitionId : 
writersToUnregister) {
+                               registeredWriters.remove(executionId, 
partitionId);
+                       }
+               }
+       }
+
+       /**
+        * Publishes the event to the registered {@link EventListener} instance.
+        * <p>
+        * This method is either called from a local input channel or the 
network
+        * I/O thread on behalf of a remote input channel.
+        */
+       public boolean publish(ExecutionAttemptID executionId, 
IntermediateResultPartitionID partitionId, TaskEvent event) {
+               EventListener<TaskEvent> listener = 
registeredWriters.get(executionId, partitionId);
+
+               if (listener != null) {
+                       listener.onEvent(event);
+                       return true;
+               }
+
+               return false;
+       }
+
+       int getNumberOfRegisteredWriters() {
+               synchronized (registeredWriters) {
+                       return registeredWriters.size();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
deleted file mode 100644
index 3af9aef..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.event.task.EventListener;
-import org.apache.flink.runtime.event.task.EventNotificationManager;
-
-/**
- * This is an abstract base class for a record reader, either dealing with 
mutable or immutable records,
- * and dealing with reads from single gates (single end points) or multiple 
gates (union).
- */
-public abstract class AbstractRecordReader implements ReaderBase {
-       
-       
-       private final EventNotificationManager eventHandler = new 
EventNotificationManager();
-       
-       private int numEventsUntilEndOfSuperstep = -1;
-       
-       private int endOfSuperstepEventsCount;
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Subscribes the listener object to receive events of the given type.
-        * 
-        * @param eventListener
-        *        the listener object to register
-        * @param eventType
-        *        the type of event to register the listener for
-        */
-       @Override
-       public void subscribeToEvent(EventListener eventListener, Class<? 
extends AbstractTaskEvent> eventType) {
-               this.eventHandler.subscribeToEvent(eventListener, eventType);
-       }
-
-       /**
-        * Removes the subscription for events of the given type for the 
listener object.
-        * 
-        * @param eventListener The listener object to cancel the subscription 
for.
-        * @param eventType The type of the event to cancel the subscription 
for.
-        */
-       @Override
-       public void unsubscribeFromEvent(EventListener eventListener, Class<? 
extends AbstractTaskEvent> eventType) {
-               this.eventHandler.unsubscribeFromEvent(eventListener, 
eventType);
-       }
-       
-       
-       protected void handleEvent(AbstractTaskEvent evt) {
-               this.eventHandler.deliverEvent(evt);
-       }
-       
-       @Override
-       public void setIterative(int numEventsUntilEndOfSuperstep) {
-               this.numEventsUntilEndOfSuperstep = 
numEventsUntilEndOfSuperstep;
-       }
-
-       @Override
-       public void startNextSuperstep() {
-               if (this.numEventsUntilEndOfSuperstep == -1) {
-                       throw new IllegalStateException("Called 
'startNextSuperstep()' in a non-iterative reader.");
-               }
-               else if (endOfSuperstepEventsCount < 
numEventsUntilEndOfSuperstep) {
-                       throw new IllegalStateException("Premature 
'startNextSuperstep()'. Not yet reached the end-of-superstep.");
-               }
-               this.endOfSuperstepEventsCount = 0;
-       }
-       
-       @Override
-       public boolean hasReachedEndOfSuperstep() {
-               return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
-       }
-       
-       protected boolean incrementEndOfSuperstepEventAndCheck() {
-               if (numEventsUntilEndOfSuperstep == -1) {
-                       throw new IllegalStateException("Received 
EndOfSuperstep event in a non-iterative reader.");
-               }
-               
-               endOfSuperstepEventsCount++;
-               
-               if (endOfSuperstepEventsCount > numEventsUntilEndOfSuperstep) {
-                       throw new IllegalStateException("Received 
EndOfSuperstep events beyond the number to indicate the end of the superstep");
-               }
-               
-               return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
deleted file mode 100644
index e308da8..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * This is an abstract base class for a record reader, either dealing with 
mutable or immutable records.
- * 
- * @param <T> The type of the record that can be read from this record reader.
- */
-public abstract class AbstractSingleGateRecordReader<T extends 
IOReadableWritable> extends AbstractRecordReader {
-       
-       /**
-        * The input gate associated with the record reader.
-        */
-       protected final InputGate<T> inputGate;
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       protected AbstractSingleGateRecordReader(AbstractInvokable invokable) {
-               this.inputGate = 
invokable.getEnvironment().createAndRegisterInputGate();
-       }
-
-       /**
-        * Returns the number of input channels wired to this reader's input 
gate.
-        * 
-        * @return the number of input channels wired to this reader's input 
gate
-        */
-       public int getNumberOfInputChannels() {
-               return this.inputGate.getNumberOfInputChannels();
-       }
-
-       /**
-        * Publishes an event.
-        * 
-        * @param event
-        *        the event to be published
-        * @throws IOException
-        *         thrown if an error occurs while transmitting the event
-        * @throws InterruptedException
-        *         thrown if the thread is interrupted while waiting for the 
event to be published
-        */
-       @Override
-       public void publishEvent(AbstractTaskEvent event) throws IOException, 
InterruptedException {
-               // Delegate call to input gate to send events
-               this.inputGate.publishEvent(event);
-       }
-       
-       @Override
-       public void publishEvent(AbstractTaskEvent event, int inputNumber) 
throws IOException, InterruptedException {
-               if(inputNumber==0) {
-                       publishEvent(event);
-               }else {
-                       throw new IOException("RecordReader has only 1 input");
-               }
-       }
-
-       public InputGate<T> getInputGate() {
-               return this.inputGate;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java
deleted file mode 100644
index 00ccfee..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.io.network.gates.InputChannelResult;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.gates.RecordAvailabilityListener;
-
-public abstract class AbstractUnionRecordReader<T extends IOReadableWritable> 
extends AbstractRecordReader implements RecordAvailabilityListener<T> {
-
-       /**
-        * The set of all input gates.
-        */
-       private final InputGate<T>[] allInputGates;
-       
-       /**
-        * The set of unclosed input gates.
-        */
-       private final Set<InputGate<T>> remainingInputGates;
-
-       /**
-        * Queue with indices of channels that store at least one available 
record.
-        */
-       private final ArrayDeque<InputGate<T>> availableInputGates = new 
ArrayDeque<InputGate<T>>();
-       
-       /**
-        * The next input gate to read a record from.
-        */
-       private InputGate<T> nextInputGateToReadFrom;
-
-       
-       @Override
-       public boolean isInputClosed() {
-               return this.remainingInputGates.isEmpty();
-       }
-       
-       /**
-        * Constructs a new mutable union record reader.
-        * 
-        * @param recordReaders
-        *        the individual mutable record readers whose input is used to 
construct the union
-        */
-       @SuppressWarnings("unchecked")
-       protected AbstractUnionRecordReader(MutableRecordReader<T>[] 
recordReaders) {
-
-               if (recordReaders == null) {
-                       throw new IllegalArgumentException("Provided argument 
recordReaders is null");
-               }
-
-               if (recordReaders.length < 2) {
-                       throw new IllegalArgumentException(
-                               "The mutable union record reader must at least 
be initialized with two individual mutable record readers");
-               }
-               
-               this.allInputGates = new InputGate[recordReaders.length];
-               this.remainingInputGates = new HashSet<InputGate<T>>((int) 
(recordReaders.length * 1.6f));
-               
-               for (int i = 0; i < recordReaders.length; i++) {
-                       InputGate<T> inputGate = 
recordReaders[i].getInputGate();
-                       inputGate.registerRecordAvailabilityListener(this);
-                       this.allInputGates[i] = inputGate;
-                       this.remainingInputGates.add(inputGate);
-               }
-       }
-       
-       
-       @Override
-       public void publishEvent(AbstractTaskEvent event) throws IOException, 
InterruptedException {
-               for (InputGate<T> gate : this.allInputGates) {
-                       gate.publishEvent(event);
-               }
-       }
-       
-       @Override
-       public void publishEvent(AbstractTaskEvent event, int inputNumber) 
throws IOException,
-                       InterruptedException {
-               allInputGates[inputNumber].publishEvent(event);
-       }
-       
-       @Override
-       public void reportRecordAvailability(InputGate<T> inputGate) {
-               synchronized (this.availableInputGates) {
-                       this.availableInputGates.add(inputGate);
-                       this.availableInputGates.notifyAll();
-               }
-       }
-       
-       protected boolean getNextRecord(T target) throws IOException, 
InterruptedException {
-
-               while (true) {
-                       // has the current input gate more data?
-                       if (this.nextInputGateToReadFrom == null) {
-                               if (this.remainingInputGates.isEmpty()) {
-                                       return false;
-                               }
-                               
-                               this.nextInputGateToReadFrom = 
getNextAvailableInputGate();
-                       }
-
-                       InputChannelResult result = 
this.nextInputGateToReadFrom.readRecord(target);
-                       switch (result) {
-                               case INTERMEDIATE_RECORD_FROM_BUFFER: // record 
is available and we can stay on the same channel
-                                       return true;
-                                       
-                               case LAST_RECORD_FROM_BUFFER: // record is 
available, but we need to re-check the channels
-                                       this.nextInputGateToReadFrom = null;
-                                       return true;
-                                       
-                               case END_OF_SUPERSTEP:
-                                       this.nextInputGateToReadFrom = null;
-                                       if 
(incrementEndOfSuperstepEventAndCheck()) {
-                                               return false; // end of the 
superstep
-                                       }
-                                       else {
-                                               break; // fall through and wait 
for next record/event
-                                       }
-                                       
-                               case TASK_EVENT:        // event for the 
subscribers is available
-                                       
handleEvent(this.nextInputGateToReadFrom.getCurrentEvent());
-                                       this.nextInputGateToReadFrom = null;
-                                       break;
-                                       
-                               case END_OF_STREAM: // one gate is empty
-                                       
this.remainingInputGates.remove(this.nextInputGateToReadFrom);
-                                       this.nextInputGateToReadFrom = null;
-                                       break;
-                                       
-                               case NONE: // gate processed an internal event 
and could not return a record on this call
-                                       this.nextInputGateToReadFrom = null;
-                                       break;
-                       }
-               }
-       }
-       
-       private InputGate<T> getNextAvailableInputGate() throws 
InterruptedException {
-               synchronized (this.availableInputGates) {
-                       while (this.availableInputGates.isEmpty()) {
-                               this.availableInputGates.wait();
-                       }
-                       return this.availableInputGates.pop();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java
deleted file mode 100644
index 8eb117d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.event.task.EventListener;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class BufferWriter {
-
-       protected final OutputGate outputGate;
-
-       public BufferWriter(AbstractInvokable invokable) {
-               this.outputGate = 
invokable.getEnvironment().createAndRegisterOutputGate();
-       }
-
-       public void sendBuffer(Buffer buffer, int targetChannel) throws 
IOException, InterruptedException {
-               this.outputGate.sendBuffer(buffer, targetChannel);
-       }
-
-       public void sendEvent(AbstractEvent event, int targetChannel) throws 
IOException, InterruptedException {
-               this.outputGate.sendEvent(event, targetChannel);
-       }
-
-       public void sendBufferAndEvent(Buffer buffer, AbstractEvent event, int 
targetChannel) throws IOException, InterruptedException {
-               this.outputGate.sendBufferAndEvent(buffer, event, 
targetChannel);
-       }
-
-       public void broadcastBuffer(Buffer buffer) throws IOException, 
InterruptedException {
-               this.outputGate.broadcastBuffer(buffer);
-       }
-
-       public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-               this.outputGate.broadcastEvent(event);
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       public void subscribeToEvent(EventListener eventListener, Class<? 
extends AbstractTaskEvent> eventType) {
-               this.outputGate.subscribeToEvent(eventListener, eventType);
-       }
-
-       public void unsubscribeFromEvent(EventListener eventListener, Class<? 
extends AbstractTaskEvent> eventType) {
-               this.outputGate.unsubscribeFromEvent(eventListener, eventType);
-       }
-
-       public void sendEndOfSuperstep() throws IOException, 
InterruptedException {
-               this.outputGate.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java
deleted file mode 100644
index c780f87..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * Objects implementing this interface are passed to an {@link 
org.apache.flink.runtime.io.network.gates.OutputGate}. When a record is sent 
through the output
- * gate, the channel selector object is called to determine to which {@link 
org.apache.flink.runtime.io.network.channels.OutputChannel} objects the record
- * shall be passed on.
- * 
- * @param <T>
- *        the type of record which is sent through the attached output gate
- */
-public interface ChannelSelector<T extends IOReadableWritable> {
-
-       /**
-        * Called to determine to which attached {@link 
org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given 
record shall be forwarded.
-        * 
-        * @param record
-        *        the record to the determine the output channels for
-        * @param numberOfOutputChannels
-        *        the total number of output channels which are attached to 
respective output gate
-        * @return a (possibly empty) array of integer numbers which indicate 
the indices of the output channels through
-        *         which the record shall be forwarded
-        */
-       int[] selectChannels(T record, int numberOfOutputChannels);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
new file mode 100644
index 0000000..49d7958
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.RuntimeEvent;
+
+import java.io.IOException;
+
+public class EndOfPartitionEvent extends RuntimeEvent {
+
+       public static final EndOfPartitionEvent INSTANCE = new 
EndOfPartitionEvent();
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               // Nothing to do here
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               // Nothing to do here
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
new file mode 100644
index 0000000..5d0199c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * Marks the end of a superstep of one particular iteration head
+ */
+public class EndOfSuperstepEvent extends RuntimeEvent {
+
+       public static final EndOfSuperstepEvent INSTANCE = new 
EndOfSuperstepEvent();
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
deleted file mode 100644
index 04027f6..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public interface MutableReader<T extends IOReadableWritable> extends 
ReaderBase {
-       
-       boolean next(T target) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java
deleted file mode 100644
index e3a9522..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.gates.InputChannelResult;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class MutableRecordReader<T extends IOReadableWritable> extends 
AbstractSingleGateRecordReader<T> implements MutableReader<T> {
-       
-       private boolean endOfStream;
-       
-       
-       /**
-        * Constructs a new mutable record reader and registers a new input 
gate with the application's environment.
-        * 
-        * @param taskBase The application that instantiated the record reader.
-        */
-       public MutableRecordReader(AbstractInvokable taskBase) {
-               super(taskBase);
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       @Override
-       public boolean next(final T target) throws IOException, 
InterruptedException {
-               if (this.endOfStream) {
-                       return false;
-                       
-               }
-               while (true) {
-                       InputChannelResult result = 
this.inputGate.readRecord(target);
-                       switch (result) {
-                               case INTERMEDIATE_RECORD_FROM_BUFFER:
-                               case LAST_RECORD_FROM_BUFFER:
-                                       return true;
-                                       
-                               case END_OF_SUPERSTEP:
-                                       if 
(incrementEndOfSuperstepEventAndCheck()) {
-                                               return false; // end of the 
superstep
-                                       }
-                                       else {
-                                               break; // fall through and wait 
for next record/event
-                                       }
-                                       
-                               case TASK_EVENT:
-                                       
handleEvent(this.inputGate.getCurrentEvent());
-                                       break;  // fall through to get next 
record
-                               
-                               case END_OF_STREAM:
-                                       this.endOfStream = true;
-                                       return false;
-                                       
-                               default:
-                                       ; // fall through to get next record
-                       }
-               }
-       }
-       
-       @Override
-       public boolean isInputClosed() {
-               return this.endOfStream;
-       }
-
-       @Override
-       public void setIterative(int numEventsUntilEndOfSuperstep) {
-               // sanity check for debug purposes
-               if (numEventsUntilEndOfSuperstep != getNumberOfInputChannels()) 
{
-                       throw new IllegalArgumentException("Number of events 
till end of superstep is different from the number of input channels.");
-               }
-               super.setIterative(numEventsUntilEndOfSuperstep);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java
deleted file mode 100644
index 2f6e2d2..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public class MutableUnionRecordReader<T extends IOReadableWritable> extends 
AbstractUnionRecordReader<T> implements MutableReader<T> {
-
-       
-       /**
-        * Constructs a new mutable union record reader.
-        * 
-        * @param recordReaders
-        *        the individual mutable record readers whose input is used to 
construct the union
-        */
-       public MutableUnionRecordReader(MutableRecordReader<T>[] recordReaders) 
{
-               super(recordReaders);
-       }
-
-       @Override
-       public boolean next(T target) throws IOException, InterruptedException {
-               return getNextRecord(target);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java
deleted file mode 100644
index 9be0978..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * A reader interface to read records from an input.
- * 
- * @param <T> The type of the record that can be emitted with this record 
writer
- */
-public interface Reader<T extends IOReadableWritable> extends ReaderBase {
-
-       boolean hasNext() throws IOException, InterruptedException;
-
-       T next() throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java
deleted file mode 100644
index 0b8069a..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.event.task.EventListener;
-
-
-/**
- *
- */
-public interface ReaderBase {
-
-       boolean isInputClosed();
-       
-       /**
-        * Subscribes the listener object to receive events of the given type.
-        * 
-        * @param eventListener
-        *        the listener object to register
-        * @param eventType
-        *        the type of event to register the listener for
-        */
-       void subscribeToEvent(EventListener eventListener, Class<? extends 
AbstractTaskEvent> eventType);
-       
-       /**
-        * Removes the subscription for events of the given type for the 
listener object.
-        * 
-        * @param eventListener
-        *        the listener object to cancel the subscription for
-        * @param eventType
-        *        the type of the event to cancel the subscription for
-        */
-       void unsubscribeFromEvent(final EventListener eventListener, final 
Class<? extends AbstractTaskEvent> eventType);
-
-       /**
-        * Publishes an event.
-        * 
-        * @param event
-        *        the event to be published
-        * @throws IOException
-        *         thrown if an error occurs while transmitting the event
-        * @throws InterruptedException
-        *         thrown if the thread is interrupted while waiting for the 
event to be published
-        */
-       void publishEvent(AbstractTaskEvent event) throws IOException, 
InterruptedException;
-       
-       /**
-        * Publishes an event to a specific input.
-        * 
-        * @param event
-        *            the event to be published
-        * @param inputNumber
-        *            the number of the input that we want to publish the event 
to
-        * 
-        * @throws IOException
-        *             thrown if an error occurs while transmitting the event
-        * @throws InterruptedException
-        *             thrown if the thread is interrupted while waiting for the
-        *             event to be published
-        */
-       void publishEvent(AbstractTaskEvent event, int inputNumber) throws 
IOException, InterruptedException;
-       
-       
-       void setIterative(int numEventsUntilEndOfSuperstep);
-
-       
-       void startNextSuperstep();
-       
-       boolean hasReachedEndOfSuperstep();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java
deleted file mode 100644
index fc97736..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.gates.InputChannelResult;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * A record writer connects an input gate to an application. It allows the 
application
- * query for incoming records and read them from input gate.
- * 
- * @param <T> The type of the record that can be read from this record reader.
- */
-public class RecordReader<T extends IOReadableWritable> extends 
AbstractSingleGateRecordReader<T> implements Reader<T> {
-       
-       private final Class<T> recordType;
-       
-       /**
-        * Stores the last read record.
-        */
-       private T lookahead;
-
-       /**
-        * Stores if more no more records will be received from the assigned 
input gate.
-        */
-       private boolean noMoreRecordsWillFollow;
-
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Constructs a new record reader and registers a new input gate with 
the application's environment.
-        * 
-        * @param taskBase
-        *        The application that instantiated the record reader.
-        * @param recordType
-        *        The class of records that can be read from the record reader.
-        */
-       public RecordReader(AbstractInvokable taskBase, Class<T> recordType) {
-               super(taskBase);
-               this.recordType = recordType;
-       }
-       
-       // 
--------------------------------------------------------------------------------------------
-       
-       /**
-        * Checks if at least one more record can be read from the associated 
input gate. This method may block
-        * until the associated input gate is able to read the record from one 
of its input channels.
-        * 
-        * @return <code>true</code>it at least one more record can be read 
from the associated input gate, otherwise
-        *         <code>false</code>
-        */
-       @Override
-       public boolean hasNext() throws IOException, InterruptedException{
-               if (this.lookahead != null) {
-                       return true;
-               } else {
-                       if (this.noMoreRecordsWillFollow) {
-                               return false;
-                       }
-                       
-                       T record = instantiateRecordType();
-                       
-                       while (true) {
-                               InputChannelResult result = 
this.inputGate.readRecord(record);
-                               switch (result) {
-                                       case INTERMEDIATE_RECORD_FROM_BUFFER:
-                                       case LAST_RECORD_FROM_BUFFER:
-                                               this.lookahead = record;
-                                               return true;
-                                               
-                                       case END_OF_SUPERSTEP:
-                                               if 
(incrementEndOfSuperstepEventAndCheck()) {
-                                                       return false;
-                                               }
-                                               else {
-                                                       break; // fall through 
and wait for next record/event
-                                               }
-                                               
-                                       case TASK_EVENT:
-                                               
handleEvent(this.inputGate.getCurrentEvent());
-                                               break;
-                                               
-                                       case END_OF_STREAM:
-                                               this.noMoreRecordsWillFollow = 
true;
-                                               return false;
-                               
-                                       default:
-                                               ; // fall through the loop
-                               }
-                       }
-               }
-       }
-
-       /**
-        * Reads the current record from the associated input gate.
-        * 
-        * @return the current record from the associated input gate.
-        * @throws IOException
-        *         thrown if any error occurs while reading the record from the 
input gate
-        */
-       @Override
-       public T next() throws IOException, InterruptedException {
-               if (hasNext()) {
-                       T tmp = this.lookahead;
-                       this.lookahead = null;
-                       return tmp;
-               } else {
-                       return null;
-               }
-       }
-       
-       @Override
-       public boolean isInputClosed() {
-               return this.noMoreRecordsWillFollow;
-       }
-       
-       private T instantiateRecordType() {
-               try {
-                       return this.recordType.newInstance();
-               } catch (InstantiationException e) {
-                       throw new RuntimeException("Cannot instantiate class '" 
+ this.recordType.getName() + "'.", e);
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException("Cannot instantiate class '" 
+ this.recordType.getName() + "'.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java
deleted file mode 100644
index 3ddd564..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
-import 
org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * A record writer connects the application to an output gate. It allows the 
application
- * of emit (send out) to the output gate. The output gate will then take care 
of distributing
- * the emitted records among the output channels.
- * 
- * @param <T>
- *        the type of the record that can be emitted with this record writer
- */
-public class RecordWriter<T extends IOReadableWritable> extends BufferWriter {
-
-       private final BufferProvider bufferPool;
-
-       private final ChannelSelector<T> channelSelector;
-
-       private int numChannels;
-
-       /** RecordSerializer per outgoing channel */
-       private RecordSerializer<T>[] serializers;
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       public RecordWriter(AbstractInvokable invokable) {
-               this(invokable, new RoundRobinChannelSelector<T>());
-       }
-
-       public RecordWriter(AbstractInvokable invokable, ChannelSelector<T> 
channelSelector) {
-               // initialize the gate
-               super(invokable);
-
-               this.bufferPool = 
invokable.getEnvironment().getOutputBufferProvider();
-               this.channelSelector = channelSelector;
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       @SuppressWarnings("unchecked")
-       public void initializeSerializers() {
-               this.numChannels = this.outputGate.getNumChannels();
-               this.serializers = new RecordSerializer[numChannels];
-               for (int i = 0; i < this.numChannels; i++) {
-                       this.serializers[i] = new SpanningRecordSerializer<T>();
-               }
-       }
-
-       public void emit(final T record) throws IOException, 
InterruptedException {
-               for (int targetChannel : 
this.channelSelector.selectChannels(record, this.numChannels)) {
-                       // serialize with corresponding serializer and send 
full buffer
-                       RecordSerializer<T> serializer = 
this.serializers[targetChannel];
-
-                       RecordSerializer.SerializationResult result = 
serializer.addRecord(record);
-                       while (result.isFullBuffer()) {
-                               Buffer buffer = serializer.getCurrentBuffer();
-                               if (buffer != null) {
-                                       sendBuffer(buffer, targetChannel);
-                               }
-
-                               buffer = 
this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
-                               result = serializer.setNextBuffer(buffer);
-                       }
-               }
-       }
-
-       public void flush() throws IOException, InterruptedException {
-               for (int targetChannel = 0; targetChannel < this.numChannels; 
targetChannel++) {
-                       RecordSerializer<T> serializer = 
this.serializers[targetChannel];
-
-                       Buffer buffer = serializer.getCurrentBuffer();
-                       if (buffer != null) {
-                               sendBuffer(buffer, targetChannel);
-                       }
-
-                       serializer.clear();
-               }
-       }
-
-       @Override
-       public void broadcastEvent(AbstractEvent event) throws IOException, 
InterruptedException {
-               for (int targetChannel = 0; targetChannel < this.numChannels; 
targetChannel++) {
-                       RecordSerializer<T> serializer = 
this.serializers[targetChannel];
-
-                       Buffer buffer = serializer.getCurrentBuffer();
-                       if (buffer == null) {
-                               super.sendEvent(event, targetChannel);
-                       } else {
-                               super.sendBufferAndEvent(buffer, event, 
targetChannel);
-
-                               buffer = 
this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
-                               serializer.setNextBuffer(buffer);
-                       }
-               }
-       }
-
-       @Override
-       public void sendEndOfSuperstep() throws IOException, 
InterruptedException {
-               for (int targetChannel = 0; targetChannel < this.numChannels; 
targetChannel++) {
-                       RecordSerializer<T> serializer = 
this.serializers[targetChannel];
-
-                       Buffer buffer = serializer.getCurrentBuffer();
-                       if (buffer == null) {
-                               super.sendEvent(EndOfSuperstepEvent.INSTANCE, 
targetChannel);
-                       } else {
-                               super.sendBufferAndEvent(buffer, 
EndOfSuperstepEvent.INSTANCE, targetChannel);
-
-                               buffer = 
this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
-                               serializer.setNextBuffer(buffer);
-                       }
-               }
-       }
-       
-       public void clearBuffers() {
-               if (this.serializers != null) {
-                       for (RecordSerializer<?> s: this.serializers) {
-                               Buffer b = s.getCurrentBuffer();
-                               if (b != null) {
-                                       b.recycleBuffer();
-                               }
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java
deleted file mode 100644
index 668046f..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * This is the default implementation of the {@link ChannelSelector} 
interface. It represents a simple round-robin
- * strategy, i.e. regardless of the record every attached exactly one output 
channel is selected at a time.
-
- * @param <T>
- *        the type of record which is sent through the attached output gate
- */
-public class RoundRobinChannelSelector<T extends IOReadableWritable> 
implements ChannelSelector<T> {
-
-       /**
-        * Stores the index of the channel to send the next record to.
-        */
-       private final int[] nextChannelToSendTo = new int[1];
-
-       /**
-        * Constructs a new default channel selector.
-        */
-       public RoundRobinChannelSelector() {
-               this.nextChannelToSendTo[0] = 0;
-       }
-
-
-       @Override
-       public int[] selectChannels(final T record, final int 
numberOfOutputChannels) {
-
-               this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) 
% numberOfOutputChannels;
-
-               return this.nextChannelToSendTo;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java
deleted file mode 100644
index 2f7ba1d..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public final class UnionRecordReader<T extends IOReadableWritable> extends 
AbstractUnionRecordReader<T> implements Reader<T> {
-       
-       private final Class<T> recordType;
-       
-       private T lookahead;
-       
-
-       public UnionRecordReader(MutableRecordReader<T>[] recordReaders, 
Class<T> recordType) {
-               super(recordReaders);
-               this.recordType = recordType;
-       }
-
-       @Override
-       public boolean hasNext() throws IOException, InterruptedException {
-               if (this.lookahead != null) {
-                       return true;
-               } else {
-                       T record = instantiateRecordType();
-                       if (getNextRecord(record)) {
-                               this.lookahead = record;
-                               return true;
-                       } else {
-                               return false;
-                       }
-               }
-       }
-
-       @Override
-       public T next() throws IOException, InterruptedException {
-               if (hasNext()) {
-                       T tmp = this.lookahead;
-                       this.lookahead = null;
-                       return tmp;
-               } else {
-                       return null;
-               }
-       }
-       
-       private T instantiateRecordType() {
-               try {
-                       return this.recordType.newInstance();
-               } catch (InstantiationException e) {
-                       throw new RuntimeException("Cannot instantiate class '" 
+ this.recordType.getName() + "'.", e);
-               } catch (IllegalAccessException e) {
-                       throw new RuntimeException("Cannot instantiate class '" 
+ this.recordType.getName() + "'.", e);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
new file mode 100644
index 0000000..cf4c302
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import 
org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.EventListener;
+
+/**
+ * A record-oriented runtime result reader, which wraps a {@link 
BufferReaderBase}.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * reader.
+ *
+ * @param <T> The type of the record that can be read with this record reader.
+ */
+abstract class AbstractRecordReader<T extends IOReadableWritable> implements 
ReaderBase {
+
+       private final BufferReaderBase reader;
+
+       private final RecordDeserializer<T>[] recordDeserializers;
+
+       private RecordDeserializer<T> currentRecordDeserializer;
+
+       private boolean isFinished;
+
+       protected AbstractRecordReader(BufferReaderBase reader) {
+               this.reader = reader;
+
+               // Initialize one deserializer per input channel
+               this.recordDeserializers = new 
AdaptiveSpanningRecordDeserializer[reader.getNumberOfInputChannels()];
+               for (int i = 0; i < recordDeserializers.length; i++) {
+                       recordDeserializers[i] = new 
AdaptiveSpanningRecordDeserializer<T>();
+               }
+       }
+
+       protected boolean getNextRecord(T target) throws IOException, 
InterruptedException {
+               if (isFinished) {
+                       return false;
+               }
+
+               while (true) {
+                       if (currentRecordDeserializer != null) {
+                               DeserializationResult result = 
currentRecordDeserializer.getNextRecord(target);
+
+                               if (result.isBufferConsumed()) {
+                                       
currentRecordDeserializer.getCurrentBuffer().recycle();
+                                       currentRecordDeserializer = null;
+                               }
+
+                               if (result.isFullRecord()) {
+                                       return true;
+                               }
+                       }
+
+                       final Buffer nextBuffer = reader.getNextBuffer();
+                       final int channelIndex = 
reader.getChannelIndexOfLastBuffer();
+
+                       if (nextBuffer == null) {
+                               if (reader.isFinished()) {
+                                       isFinished = true;
+                                       return false;
+                               }
+                               else if (reader.hasReachedEndOfSuperstep()) {
+                                       return false;
+                               }
+                               else {
+                                       // More data is coming...
+                                       continue;
+                               }
+                       }
+
+                       currentRecordDeserializer = 
recordDeserializers[channelIndex];
+                       currentRecordDeserializer.setNextBuffer(nextBuffer);
+               }
+       }
+
+       public void clearBuffers() {
+               for (RecordDeserializer<?> deserializer : recordDeserializers) {
+                       Buffer buffer = deserializer.getCurrentBuffer();
+                       if (buffer != null && !buffer.isRecycled()) {
+                               buffer.recycle();
+                       }
+               }
+       }
+
+       @Override
+       public void sendTaskEvent(TaskEvent event) throws IOException, 
InterruptedException {
+               reader.sendTaskEvent(event);
+       }
+
+       @Override
+       public boolean isFinished() {
+               return reader.isFinished();
+       }
+
+       @Override
+       public void subscribeToTaskEvent(EventListener<TaskEvent> 
eventListener, Class<? extends TaskEvent> eventType) {
+               reader.subscribeToTaskEvent(eventListener, eventType);
+       }
+
+       @Override
+       public void setIterativeReader() {
+               reader.setIterativeReader();
+       }
+
+       @Override
+       public void startNextSuperstep() {
+               reader.startNextSuperstep();
+       }
+
+       @Override
+       public boolean hasReachedEndOfSuperstep() {
+               return reader.hasReachedEndOfSuperstep();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
new file mode 100644
index 0000000..1df7216
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import com.google.common.collect.Maps;
+import 
org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.PartitionInfo.PartitionLocation;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.execution.RuntimeEnvironment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.RemoteAddress;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import 
org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.util.event.EventNotificationHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+public final class BufferReader implements BufferReaderBase {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(BufferReader.class);
+
+       private final Object requestLock = new Object();
+
+       private final RuntimeEnvironment environment;
+
+       private final NetworkEnvironment networkEnvironment;
+
+       private final EventNotificationHandler<TaskEvent> taskEventHandler = 
new EventNotificationHandler<TaskEvent>();
+
+       private final IntermediateDataSetID consumedResultId;
+
+       private final int totalNumberOfInputChannels;
+
+       private final int queueToRequest;
+
+       private final Map<IntermediateResultPartitionID, InputChannel> 
inputChannels;
+
+       private BufferPool bufferPool;
+
+       private boolean isReleased;
+
+       private boolean isTaskEvent;
+
+       // 
------------------------------------------------------------------------
+
+       private final BlockingQueue<InputChannel> inputChannelsWithData = new 
LinkedBlockingQueue<InputChannel>();
+
+       private final AtomicReference<EventListener<BufferReader>> 
readerListener = new AtomicReference<EventListener<BufferReader>>(null);
+
+       // 
------------------------------------------------------------------------
+
+       private boolean isIterativeReader;
+
+       private int currentNumEndOfSuperstepEvents;
+
+       private int channelIndexOfLastReadBuffer = -1;
+
+       private boolean hasRequestedPartitions = false;
+
+       public BufferReader(RuntimeEnvironment environment, NetworkEnvironment 
networkEnvironment, IntermediateDataSetID consumedResultId, int 
numberOfInputChannels, int queueToRequest) {
+
+               this.consumedResultId = checkNotNull(consumedResultId);
+               // Note: the environment is not fully initialized yet
+               this.environment = checkNotNull(environment);
+
+               this.networkEnvironment = networkEnvironment;
+
+               checkArgument(numberOfInputChannels >= 0);
+               this.totalNumberOfInputChannels = numberOfInputChannels;
+
+               checkArgument(queueToRequest >= 0);
+               this.queueToRequest = queueToRequest;
+
+               this.inputChannels = 
Maps.newHashMapWithExpectedSize(numberOfInputChannels);
+       }
+
+       // 
------------------------------------------------------------------------
+       // Properties
+       // 
------------------------------------------------------------------------
+
+       public void setBufferPool(BufferPool bufferPool) {
+               checkArgument(bufferPool.getNumberOfRequiredMemorySegments() == 
totalNumberOfInputChannels, "Buffer pool has not enough buffers for this 
reader.");
+               checkState(this.bufferPool == null, "Buffer pool has already 
been set for reader.");
+
+               this.bufferPool = checkNotNull(bufferPool);
+       }
+
+       public IntermediateDataSetID getConsumedResultId() {
+               return consumedResultId;
+       }
+
+       public String getTaskNameWithSubtasks() {
+               return environment.getTaskNameWithSubtasks();
+       }
+
+       public IntermediateResultPartitionProvider 
getIntermediateResultPartitionProvider() {
+               return networkEnvironment.getPartitionManager();
+       }
+
+       public TaskEventDispatcher getTaskEventDispatcher() {
+               return networkEnvironment.getTaskEventDispatcher();
+       }
+
+       public ConnectionManager getConnectionManager() {
+               return networkEnvironment.getConnectionManager();
+       }
+
+       // TODO This is a work-around for the union reader
+       boolean hasInputChannelWithData() {
+               return !inputChannelsWithData.isEmpty();
+       }
+
+       /**
+        * Returns the total number of input channels for this reader.
+        * <p>
+        * Note: This number might be smaller the current number of input 
channels
+        * of the reader as channels are possibly updated during runtime.
+        */
+       public int getNumberOfInputChannels() {
+               return totalNumberOfInputChannels;
+       }
+
+       public BufferProvider getBufferProvider() {
+               return bufferPool;
+       }
+
+       public void setInputChannel(IntermediateResultPartitionID partitionId, 
InputChannel inputChannel) {
+               synchronized (requestLock) {
+                       inputChannels.put(checkNotNull(partitionId), 
checkNotNull(inputChannel));
+               }
+       }
+
+       public void updateInputChannel(PartitionInfo partitionInfo) throws 
IOException {
+               synchronized (requestLock) {
+                       if (isReleased) {
+                               // There was a race with a task failure/cancel
+                               return;
+                       }
+
+                       final IntermediateResultPartitionID partitionId = 
partitionInfo.getPartitionId();
+
+                       InputChannel current = inputChannels.get(partitionId);
+
+                       if (current.getClass() == UnknownInputChannel.class) {
+                               UnknownInputChannel unknownChannel = 
(UnknownInputChannel) current;
+
+                               InputChannel newChannel;
+
+                               if (partitionInfo.getProducerLocation() == 
PartitionLocation.REMOTE) {
+                                       newChannel = 
unknownChannel.toRemoteInputChannel(partitionInfo.getProducerAddress());
+                               }
+                               else if (partitionInfo.getProducerLocation() == 
PartitionLocation.LOCAL) {
+                                       newChannel = 
unknownChannel.toLocalInputChannel();
+                               }
+                               else {
+                                       throw new IllegalStateException("Tried 
to update unknown channel with unknown channel.");
+                               }
+
+                               inputChannels.put(partitionId, newChannel);
+
+                               
newChannel.requestIntermediateResultPartition(queueToRequest);
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // Consume
+       // 
------------------------------------------------------------------------
+
+       void requestPartitionsOnce() throws IOException {
+               if (!hasRequestedPartitions) {
+                       // Sanity check
+                       if (totalNumberOfInputChannels != inputChannels.size()) 
{
+                               throw new IllegalStateException("Mismatch 
between number of total input channels and the currently number of set input 
channels.");
+                       }
+
+                       synchronized (requestLock) {
+                               for (InputChannel inputChannel : 
inputChannels.values()) {
+                                       
inputChannel.requestIntermediateResultPartition(queueToRequest);
+                               }
+                       }
+
+                       hasRequestedPartitions = true;
+               }
+       }
+
+       @Override
+       public Buffer getNextBuffer() throws IOException, InterruptedException {
+               requestPartitionsOnce();
+
+               while (true) {
+                       if (Thread.interrupted()) {
+                               throw new InterruptedException();
+                       }
+
+                       // Possibly block until data is available at one of the 
input channels
+                       InputChannel currentChannel = null;
+                       while (currentChannel == null) {
+                               currentChannel = 
inputChannelsWithData.poll(2000, TimeUnit.MILLISECONDS);
+                       }
+
+                       isTaskEvent = false;
+
+                       final Buffer buffer = currentChannel.getNextBuffer();
+
+                       if (buffer == null) {
+                               throw new IllegalStateException("Bug in reader 
logic: queried for a buffer although none was available.");
+                       }
+
+                       if (buffer.isBuffer()) {
+                               channelIndexOfLastReadBuffer = 
currentChannel.getChannelIndex();
+                               return buffer;
+                       }
+                       else {
+                               try {
+                                       final AbstractEvent event = 
EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+
+                                       // 
------------------------------------------------------------
+                                       // Runtime events
+                                       // 
------------------------------------------------------------
+                                       // Note: We can not assume that every 
channel will be finished
+                                       // with an according event. In failure 
cases or iterations the
+                                       // consumer task finishes earlier and 
has to release all
+                                       // resources.
+                                       // 
------------------------------------------------------------
+                                       if (event.getClass() == 
EndOfPartitionEvent.class) {
+                                               
currentChannel.releaseAllResources();
+
+                                               return null;
+                                       }
+                                       else if (event.getClass() == 
EndOfSuperstepEvent.class) {
+                                               
incrementEndOfSuperstepEventAndCheck();
+
+                                               return null;
+                                       }
+                                       // 
------------------------------------------------------------
+                                       // Task events (user)
+                                       // 
------------------------------------------------------------
+                                       else if (event instanceof TaskEvent) {
+                                               
taskEventHandler.publish((TaskEvent) event);
+
+                                               isTaskEvent = true;
+
+                                               return null;
+                                       }
+                                       else {
+                                               throw new 
IllegalStateException("Received unexpected event " + event + " from input 
channel " + currentChannel + ".");
+                                       }
+                               }
+                               catch (Throwable t) {
+                                       throw new IOException("Error while 
reading event: " + t.getMessage(), t);
+                               }
+                               finally {
+                                       buffer.recycle();
+                               }
+                       }
+               }
+       }
+
+       @Override
+       public Buffer getNextBuffer(Buffer exchangeBuffer) {
+               throw new UnsupportedOperationException("Buffer exchange when 
reading data is not yet supported.");
+       }
+
+       @Override
+       public int getChannelIndexOfLastBuffer() {
+               return channelIndexOfLastReadBuffer;
+       }
+
+       @Override
+       public boolean isTaskEvent() {
+               return isTaskEvent;
+       }
+
+       @Override
+       public boolean isFinished() {
+               synchronized (requestLock) {
+                       for (InputChannel inputChannel : 
inputChannels.values()) {
+                               if (!inputChannel.isReleased()) {
+                                       return false;
+                               }
+                       }
+               }
+
+               return true;
+       }
+
+       public void releaseAllResources() throws IOException {
+               synchronized (requestLock) {
+                       if (!isReleased) {
+                               try {
+                                       for (InputChannel inputChannel : 
inputChannels.values()) {
+                                               try {
+                                                       
inputChannel.releaseAllResources();
+                                               }
+                                               catch (IOException e) {
+                                                       LOG.warn("Error during 
release of channel resources: " + e.getMessage(), e);
+                                               }
+                                       }
+
+                                       // The buffer pool can actually be 
destroyed immediately after the
+                                       // reader received all of the data from 
the input channels.
+                                       if (bufferPool != null) {
+                                               bufferPool.destroy();
+                                       }
+                               }
+                               finally {
+                                       isReleased = true;
+                               }
+                       }
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // Channel notifications
+       // 
------------------------------------------------------------------------
+
+       public void onAvailableInputChannel(InputChannel inputChannel) {
+               inputChannelsWithData.add(inputChannel);
+
+               if (readerListener.get() != null) {
+                       readerListener.get().onEvent(this);
+               }
+       }
+
+       void subscribeToReader(EventListener<BufferReader> listener) {
+               if (!this.readerListener.compareAndSet(null, listener)) {
+                       throw new IllegalStateException(listener + " is already 
registered as a record availability listener");
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // Task events
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void sendTaskEvent(TaskEvent event) throws IOException, 
InterruptedException {
+               // This can be improved by just serializing the event once for 
all
+               // remote input channels.
+               synchronized (requestLock) {
+                       for (InputChannel inputChannel : 
inputChannels.values()) {
+                               inputChannel.sendTaskEvent(event);
+                       }
+               }
+       }
+
+       @Override
+       public void subscribeToTaskEvent(EventListener<TaskEvent> listener, 
Class<? extends TaskEvent> eventType) {
+               taskEventHandler.subscribe(listener, eventType);
+       }
+
+       // 
------------------------------------------------------------------------
+       // Iteration end of superstep events
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void setIterativeReader() {
+               isIterativeReader = true;
+       }
+
+       @Override
+       public void startNextSuperstep() {
+               checkState(isIterativeReader, "Tried to start next superstep in 
a non-iterative reader.");
+               checkState(currentNumEndOfSuperstepEvents == 
totalNumberOfInputChannels,
+                               "Tried to start next superstep before reaching 
end of previous superstep.");
+
+               currentNumEndOfSuperstepEvents = 0;
+       }
+
+       @Override
+       public boolean hasReachedEndOfSuperstep() {
+               return currentNumEndOfSuperstepEvents == 
totalNumberOfInputChannels;
+       }
+
+       private boolean incrementEndOfSuperstepEventAndCheck() {
+               checkState(isIterativeReader, "Received end of superstep event 
in a non-iterative reader.");
+
+               currentNumEndOfSuperstepEvents++;
+
+               checkState(currentNumEndOfSuperstepEvents <= 
totalNumberOfInputChannels,
+                               "Received too many (" + 
currentNumEndOfSuperstepEvents + ") end of superstep events.");
+
+               return currentNumEndOfSuperstepEvents == 
totalNumberOfInputChannels;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return String.format("BufferReader %s [task: %s, current/total 
number of input channels: %d/%d]",
+                               consumedResultId, getTaskNameWithSubtasks(), 
inputChannels.size(), totalNumberOfInputChannels);
+       }
+
+       public static BufferReader create(RuntimeEnvironment 
runtimeEnvironment, NetworkEnvironment networkEnvironment, 
PartitionConsumerDeploymentDescriptor desc) {
+               // The consumed intermediate data set (all partitions are part 
of this data set)
+               final IntermediateDataSetID resultId = desc.getResultId();
+
+               // The queue to request from each consumed partition
+               final int queueIndex = desc.getQueueIndex();
+
+               // There is one input channel for each consumed partition
+               final PartitionInfo[] partitions = desc.getPartitions();
+               final int numberOfInputChannels = partitions.length;
+
+               final BufferReader reader = new 
BufferReader(runtimeEnvironment, networkEnvironment, resultId, 
numberOfInputChannels, queueIndex);
+
+               // Create input channels
+               final InputChannel[] inputChannels = new 
InputChannel[numberOfInputChannels];
+
+               int channelIndex = 0;
+
+               for (PartitionInfo partition : partitions) {
+                       final ExecutionAttemptID producerExecutionId = 
partition.getProducerExecutionId();
+                       final IntermediateResultPartitionID partitionId = 
partition.getPartitionId();
+
+                       final PartitionLocation producerLocation = 
partition.getProducerLocation();
+
+                       switch (producerLocation) {
+                               case LOCAL:
+                                       inputChannels[channelIndex] = new 
LocalInputChannel(channelIndex, producerExecutionId, partitionId, reader);
+                                       break;
+
+                               case REMOTE:
+                                       final RemoteAddress producerAddress = 
checkNotNull(partition.getProducerAddress(), "Missing producer address for 
remote intermediate result partition.");
+
+                                       inputChannels[channelIndex] = new 
RemoteInputChannel(channelIndex, producerExecutionId, partitionId, reader, 
producerAddress);
+                                       break;
+
+                               case UNKNOWN:
+                                       inputChannels[channelIndex] = new 
UnknownInputChannel(channelIndex, producerExecutionId, partitionId, reader);
+                                       break;
+                       }
+
+                       reader.setInputChannel(partitionId, 
inputChannels[channelIndex]);
+
+                       channelIndex++;
+               }
+
+               return reader;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
new file mode 100644
index 0000000..04fae71
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import java.io.IOException;
+
+/**
+ * A buffer-oriented runtime result reader.
+ * <p>
+ * {@link BufferReaderBase} is the runtime API for consuming results. Events
+ * are handled by the reader and users can query for buffers with
+ * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}.
+ * <p>
+ * <strong>Important</strong>: If {@link #getNextBuffer()} is used, it is
+ * necessary to release the returned buffers with {@link Buffer#recycle()}
+ * after they are consumed.
+ */
+public interface BufferReaderBase extends ReaderBase {
+
+       /**
+        * Returns the next queued {@link Buffer} from one of the {@link 
RemoteInputChannel}
+        * instances attached to this reader. The are no ordering guarantees 
with
+        * respect to which channel is queried for data.
+        * <p>
+        * <strong>Important</strong>: it is necessary to release buffers, which
+        * are returned by the reader via {@link Buffer#recycle()}, because they
+        * are a pooled resource. If not recycled, the network stack will run 
out
+        * of buffers and deadlock.
+        *
+        * @see #getChannelIndexOfLastBuffer()
+        */
+       Buffer getNextBuffer() throws IOException, InterruptedException;
+
+       /**
+        * {@link #getNextBuffer()} requires the user to quickly recycle the
+        * returned buffer. For a fully buffer-oriented runtime, we need to
+        * support a variant of this method, which allows buffers to be 
exchanged
+        * in order to save unnecessary memory copies between buffer pools.
+        * <p>
+        * Currently this is not a problem, because the only "users" of the 
buffer-
+        * oriented API are the record-oriented readers, which immediately
+        * deserialize the buffer and recycle it.
+        */
+       Buffer getNextBuffer(Buffer exchangeBuffer) throws IOException, 
InterruptedException;
+
+       /**
+        * Returns a channel index for the last {@link Buffer} instance 
returned by
+        * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}.
+        * <p>
+        * The returned index is guaranteed to be the same for all buffers read 
by
+        * the same {@link RemoteInputChannel} instance. This is useful when 
data spans
+        * multiple buffers returned by this reader.
+        * <p>
+        * Initially returns <code>-1</code> and if multiple readers are 
unioned,
+        * the local channel indexes are mapped to the sequence from 0 to n-1.
+        */
+       int getChannelIndexOfLastBuffer();
+
+       /**
+        * Returns the total number of {@link InputChannel} instances, from 
which this
+        * reader gets its data.
+        */
+       int getNumberOfInputChannels();
+
+       boolean isTaskEvent();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
new file mode 100644
index 0000000..e47982e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * A record-oriented reader for mutable record types.
+ */
+public interface MutableReader<T extends IOReadableWritable> extends 
ReaderBase {
+
+       boolean next(T target) throws IOException, InterruptedException;
+
+       void clearBuffers();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
new file mode 100644
index 0000000..75d4f21
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+import java.io.IOException;
+
+public class MutableRecordReader<T extends IOReadableWritable> extends 
AbstractRecordReader<T> implements MutableReader<T> {
+
+       public MutableRecordReader(BufferReaderBase reader) {
+               super(reader);
+       }
+
+       @Override
+       public boolean next(final T target) throws IOException, 
InterruptedException {
+               return getNextRecord(target);
+       }
+
+       @Override
+       public void clearBuffers() {
+               super.clearBuffers();
+       }
+}

Reply via email to