http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java new file mode 100644 index 0000000..7a529b9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.api.writer.BufferWriter; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.util.event.EventListener; + +import java.util.ArrayList; +import java.util.List; + +/** + * The task event dispatcher dispatches events flowing backwards from a consumer + * to a producer. It only supports programs, where the producer and consumer + * are running at the same time. + * <p> + * The publish method is either called from the local input channel or the + * network I/O thread. + */ +public class TaskEventDispatcher { + + Table<ExecutionAttemptID, IntermediateResultPartitionID, BufferWriter> registeredWriters = HashBasedTable.create(); + + public void registerWriterForIncomingTaskEvents(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, BufferWriter listener) { + synchronized (registeredWriters) { + if (registeredWriters.put(executionId, partitionId, listener) != null) { + throw new IllegalStateException("Event dispatcher already contains buffer writer."); + } + } + } + + public void unregisterWriters(ExecutionAttemptID executionId) { + synchronized (registeredWriters) { + List<IntermediateResultPartitionID> writersToUnregister = new ArrayList<IntermediateResultPartitionID>(); + + for (IntermediateResultPartitionID partitionId : registeredWriters.row(executionId).keySet()) { + writersToUnregister.add(partitionId); + } + + for(IntermediateResultPartitionID partitionId : writersToUnregister) { + registeredWriters.remove(executionId, partitionId); + } + } + } + + /** + * Publishes the event to the registered {@link EventListener} instance. + * <p> + * This method is either called from a local input channel or the network + * I/O thread on behalf of a remote input channel. + */ + public boolean publish(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, TaskEvent event) { + EventListener<TaskEvent> listener = registeredWriters.get(executionId, partitionId); + + if (listener != null) { + listener.onEvent(event); + return true; + } + + return false; + } + + int getNumberOfRegisteredWriters() { + synchronized (registeredWriters) { + return registeredWriters.size(); + } + } +}
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java deleted file mode 100644 index 3af9aef..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import org.apache.flink.runtime.event.task.AbstractTaskEvent; -import org.apache.flink.runtime.event.task.EventListener; -import org.apache.flink.runtime.event.task.EventNotificationManager; - -/** - * This is an abstract base class for a record reader, either dealing with mutable or immutable records, - * and dealing with reads from single gates (single end points) or multiple gates (union). - */ -public abstract class AbstractRecordReader implements ReaderBase { - - - private final EventNotificationManager eventHandler = new EventNotificationManager(); - - private int numEventsUntilEndOfSuperstep = -1; - - private int endOfSuperstepEventsCount; - - // -------------------------------------------------------------------------------------------- - - /** - * Subscribes the listener object to receive events of the given type. - * - * @param eventListener - * the listener object to register - * @param eventType - * the type of event to register the listener for - */ - @Override - public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) { - this.eventHandler.subscribeToEvent(eventListener, eventType); - } - - /** - * Removes the subscription for events of the given type for the listener object. - * - * @param eventListener The listener object to cancel the subscription for. - * @param eventType The type of the event to cancel the subscription for. - */ - @Override - public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) { - this.eventHandler.unsubscribeFromEvent(eventListener, eventType); - } - - - protected void handleEvent(AbstractTaskEvent evt) { - this.eventHandler.deliverEvent(evt); - } - - @Override - public void setIterative(int numEventsUntilEndOfSuperstep) { - this.numEventsUntilEndOfSuperstep = numEventsUntilEndOfSuperstep; - } - - @Override - public void startNextSuperstep() { - if (this.numEventsUntilEndOfSuperstep == -1) { - throw new IllegalStateException("Called 'startNextSuperstep()' in a non-iterative reader."); - } - else if (endOfSuperstepEventsCount < numEventsUntilEndOfSuperstep) { - throw new IllegalStateException("Premature 'startNextSuperstep()'. Not yet reached the end-of-superstep."); - } - this.endOfSuperstepEventsCount = 0; - } - - @Override - public boolean hasReachedEndOfSuperstep() { - return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep; - } - - protected boolean incrementEndOfSuperstepEventAndCheck() { - if (numEventsUntilEndOfSuperstep == -1) { - throw new IllegalStateException("Received EndOfSuperstep event in a non-iterative reader."); - } - - endOfSuperstepEventsCount++; - - if (endOfSuperstepEventsCount > numEventsUntilEndOfSuperstep) { - throw new IllegalStateException("Received EndOfSuperstep events beyond the number to indicate the end of the superstep"); - } - - return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java deleted file mode 100644 index e308da8..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.event.task.AbstractTaskEvent; -import org.apache.flink.runtime.io.network.gates.InputGate; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -/** - * This is an abstract base class for a record reader, either dealing with mutable or immutable records. - * - * @param <T> The type of the record that can be read from this record reader. - */ -public abstract class AbstractSingleGateRecordReader<T extends IOReadableWritable> extends AbstractRecordReader { - - /** - * The input gate associated with the record reader. - */ - protected final InputGate<T> inputGate; - - // -------------------------------------------------------------------------------------------- - - protected AbstractSingleGateRecordReader(AbstractInvokable invokable) { - this.inputGate = invokable.getEnvironment().createAndRegisterInputGate(); - } - - /** - * Returns the number of input channels wired to this reader's input gate. - * - * @return the number of input channels wired to this reader's input gate - */ - public int getNumberOfInputChannels() { - return this.inputGate.getNumberOfInputChannels(); - } - - /** - * Publishes an event. - * - * @param event - * the event to be published - * @throws IOException - * thrown if an error occurs while transmitting the event - * @throws InterruptedException - * thrown if the thread is interrupted while waiting for the event to be published - */ - @Override - public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException { - // Delegate call to input gate to send events - this.inputGate.publishEvent(event); - } - - @Override - public void publishEvent(AbstractTaskEvent event, int inputNumber) throws IOException, InterruptedException { - if(inputNumber==0) { - publishEvent(event); - }else { - throw new IOException("RecordReader has only 1 input"); - } - } - - public InputGate<T> getInputGate() { - return this.inputGate; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java deleted file mode 100644 index 00ccfee..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.HashSet; -import java.util.Set; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.event.task.AbstractTaskEvent; -import org.apache.flink.runtime.io.network.gates.InputChannelResult; -import org.apache.flink.runtime.io.network.gates.InputGate; -import org.apache.flink.runtime.io.network.gates.RecordAvailabilityListener; - -public abstract class AbstractUnionRecordReader<T extends IOReadableWritable> extends AbstractRecordReader implements RecordAvailabilityListener<T> { - - /** - * The set of all input gates. - */ - private final InputGate<T>[] allInputGates; - - /** - * The set of unclosed input gates. - */ - private final Set<InputGate<T>> remainingInputGates; - - /** - * Queue with indices of channels that store at least one available record. - */ - private final ArrayDeque<InputGate<T>> availableInputGates = new ArrayDeque<InputGate<T>>(); - - /** - * The next input gate to read a record from. - */ - private InputGate<T> nextInputGateToReadFrom; - - - @Override - public boolean isInputClosed() { - return this.remainingInputGates.isEmpty(); - } - - /** - * Constructs a new mutable union record reader. - * - * @param recordReaders - * the individual mutable record readers whose input is used to construct the union - */ - @SuppressWarnings("unchecked") - protected AbstractUnionRecordReader(MutableRecordReader<T>[] recordReaders) { - - if (recordReaders == null) { - throw new IllegalArgumentException("Provided argument recordReaders is null"); - } - - if (recordReaders.length < 2) { - throw new IllegalArgumentException( - "The mutable union record reader must at least be initialized with two individual mutable record readers"); - } - - this.allInputGates = new InputGate[recordReaders.length]; - this.remainingInputGates = new HashSet<InputGate<T>>((int) (recordReaders.length * 1.6f)); - - for (int i = 0; i < recordReaders.length; i++) { - InputGate<T> inputGate = recordReaders[i].getInputGate(); - inputGate.registerRecordAvailabilityListener(this); - this.allInputGates[i] = inputGate; - this.remainingInputGates.add(inputGate); - } - } - - - @Override - public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException { - for (InputGate<T> gate : this.allInputGates) { - gate.publishEvent(event); - } - } - - @Override - public void publishEvent(AbstractTaskEvent event, int inputNumber) throws IOException, - InterruptedException { - allInputGates[inputNumber].publishEvent(event); - } - - @Override - public void reportRecordAvailability(InputGate<T> inputGate) { - synchronized (this.availableInputGates) { - this.availableInputGates.add(inputGate); - this.availableInputGates.notifyAll(); - } - } - - protected boolean getNextRecord(T target) throws IOException, InterruptedException { - - while (true) { - // has the current input gate more data? - if (this.nextInputGateToReadFrom == null) { - if (this.remainingInputGates.isEmpty()) { - return false; - } - - this.nextInputGateToReadFrom = getNextAvailableInputGate(); - } - - InputChannelResult result = this.nextInputGateToReadFrom.readRecord(target); - switch (result) { - case INTERMEDIATE_RECORD_FROM_BUFFER: // record is available and we can stay on the same channel - return true; - - case LAST_RECORD_FROM_BUFFER: // record is available, but we need to re-check the channels - this.nextInputGateToReadFrom = null; - return true; - - case END_OF_SUPERSTEP: - this.nextInputGateToReadFrom = null; - if (incrementEndOfSuperstepEventAndCheck()) { - return false; // end of the superstep - } - else { - break; // fall through and wait for next record/event - } - - case TASK_EVENT: // event for the subscribers is available - handleEvent(this.nextInputGateToReadFrom.getCurrentEvent()); - this.nextInputGateToReadFrom = null; - break; - - case END_OF_STREAM: // one gate is empty - this.remainingInputGates.remove(this.nextInputGateToReadFrom); - this.nextInputGateToReadFrom = null; - break; - - case NONE: // gate processed an internal event and could not return a record on this call - this.nextInputGateToReadFrom = null; - break; - } - } - } - - private InputGate<T> getNextAvailableInputGate() throws InterruptedException { - synchronized (this.availableInputGates) { - while (this.availableInputGates.isEmpty()) { - this.availableInputGates.wait(); - } - return this.availableInputGates.pop(); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java deleted file mode 100644 index 8eb117d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.event.task.AbstractTaskEvent; -import org.apache.flink.runtime.event.task.EventListener; -import org.apache.flink.runtime.io.network.Buffer; -import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent; -import org.apache.flink.runtime.io.network.gates.OutputGate; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -public class BufferWriter { - - protected final OutputGate outputGate; - - public BufferWriter(AbstractInvokable invokable) { - this.outputGate = invokable.getEnvironment().createAndRegisterOutputGate(); - } - - public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException { - this.outputGate.sendBuffer(buffer, targetChannel); - } - - public void sendEvent(AbstractEvent event, int targetChannel) throws IOException, InterruptedException { - this.outputGate.sendEvent(event, targetChannel); - } - - public void sendBufferAndEvent(Buffer buffer, AbstractEvent event, int targetChannel) throws IOException, InterruptedException { - this.outputGate.sendBufferAndEvent(buffer, event, targetChannel); - } - - public void broadcastBuffer(Buffer buffer) throws IOException, InterruptedException { - this.outputGate.broadcastBuffer(buffer); - } - - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - this.outputGate.broadcastEvent(event); - } - - // ----------------------------------------------------------------------------------------------------------------- - - public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) { - this.outputGate.subscribeToEvent(eventListener, eventType); - } - - public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) { - this.outputGate.unsubscribeFromEvent(eventListener, eventType); - } - - public void sendEndOfSuperstep() throws IOException, InterruptedException { - this.outputGate.broadcastEvent(EndOfSuperstepEvent.INSTANCE); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java deleted file mode 100644 index c780f87..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api; - -import org.apache.flink.core.io.IOReadableWritable; - -/** - * Objects implementing this interface are passed to an {@link org.apache.flink.runtime.io.network.gates.OutputGate}. When a record is sent through the output - * gate, the channel selector object is called to determine to which {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the record - * shall be passed on. - * - * @param <T> - * the type of record which is sent through the attached output gate - */ -public interface ChannelSelector<T extends IOReadableWritable> { - - /** - * Called to determine to which attached {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given record shall be forwarded. - * - * @param record - * the record to the determine the output channels for - * @param numberOfOutputChannels - * the total number of output channels which are attached to respective output gate - * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through - * which the record shall be forwarded - */ - int[] selectChannels(T record, int numberOfOutputChannels); -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java new file mode 100644 index 0000000..49d7958 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.task.RuntimeEvent; + +import java.io.IOException; + +public class EndOfPartitionEvent extends RuntimeEvent { + + public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent(); + + @Override + public void read(DataInputView in) throws IOException { + // Nothing to do here + } + + @Override + public void write(DataOutputView out) throws IOException { + // Nothing to do here + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java new file mode 100644 index 0000000..5d0199c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api; + +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.event.task.RuntimeEvent; + +import java.io.IOException; + +/** + * Marks the end of a superstep of one particular iteration head + */ +public class EndOfSuperstepEvent extends RuntimeEvent { + + public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent(); + + @Override + public void write(DataOutputView out) throws IOException { + } + + @Override + public void read(DataInputView in) throws IOException { + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java deleted file mode 100644 index 04027f6..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; - -public interface MutableReader<T extends IOReadableWritable> extends ReaderBase { - - boolean next(T target) throws IOException, InterruptedException; -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java deleted file mode 100644 index e3a9522..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.gates.InputChannelResult; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -public class MutableRecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements MutableReader<T> { - - private boolean endOfStream; - - - /** - * Constructs a new mutable record reader and registers a new input gate with the application's environment. - * - * @param taskBase The application that instantiated the record reader. - */ - public MutableRecordReader(AbstractInvokable taskBase) { - super(taskBase); - } - - // -------------------------------------------------------------------------------------------- - - @Override - public boolean next(final T target) throws IOException, InterruptedException { - if (this.endOfStream) { - return false; - - } - while (true) { - InputChannelResult result = this.inputGate.readRecord(target); - switch (result) { - case INTERMEDIATE_RECORD_FROM_BUFFER: - case LAST_RECORD_FROM_BUFFER: - return true; - - case END_OF_SUPERSTEP: - if (incrementEndOfSuperstepEventAndCheck()) { - return false; // end of the superstep - } - else { - break; // fall through and wait for next record/event - } - - case TASK_EVENT: - handleEvent(this.inputGate.getCurrentEvent()); - break; // fall through to get next record - - case END_OF_STREAM: - this.endOfStream = true; - return false; - - default: - ; // fall through to get next record - } - } - } - - @Override - public boolean isInputClosed() { - return this.endOfStream; - } - - @Override - public void setIterative(int numEventsUntilEndOfSuperstep) { - // sanity check for debug purposes - if (numEventsUntilEndOfSuperstep != getNumberOfInputChannels()) { - throw new IllegalArgumentException("Number of events till end of superstep is different from the number of input channels."); - } - super.setIterative(numEventsUntilEndOfSuperstep); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java deleted file mode 100644 index 2f6e2d2..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; - -public class MutableUnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements MutableReader<T> { - - - /** - * Constructs a new mutable union record reader. - * - * @param recordReaders - * the individual mutable record readers whose input is used to construct the union - */ - public MutableUnionRecordReader(MutableRecordReader<T>[] recordReaders) { - super(recordReaders); - } - - @Override - public boolean next(T target) throws IOException, InterruptedException { - return getNextRecord(target); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java deleted file mode 100644 index 9be0978..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; - -/** - * A reader interface to read records from an input. - * - * @param <T> The type of the record that can be emitted with this record writer - */ -public interface Reader<T extends IOReadableWritable> extends ReaderBase { - - boolean hasNext() throws IOException, InterruptedException; - - T next() throws IOException, InterruptedException; -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java deleted file mode 100644 index 0b8069a..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.runtime.event.task.AbstractTaskEvent; -import org.apache.flink.runtime.event.task.EventListener; - - -/** - * - */ -public interface ReaderBase { - - boolean isInputClosed(); - - /** - * Subscribes the listener object to receive events of the given type. - * - * @param eventListener - * the listener object to register - * @param eventType - * the type of event to register the listener for - */ - void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType); - - /** - * Removes the subscription for events of the given type for the listener object. - * - * @param eventListener - * the listener object to cancel the subscription for - * @param eventType - * the type of the event to cancel the subscription for - */ - void unsubscribeFromEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType); - - /** - * Publishes an event. - * - * @param event - * the event to be published - * @throws IOException - * thrown if an error occurs while transmitting the event - * @throws InterruptedException - * thrown if the thread is interrupted while waiting for the event to be published - */ - void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException; - - /** - * Publishes an event to a specific input. - * - * @param event - * the event to be published - * @param inputNumber - * the number of the input that we want to publish the event to - * - * @throws IOException - * thrown if an error occurs while transmitting the event - * @throws InterruptedException - * thrown if the thread is interrupted while waiting for the - * event to be published - */ - void publishEvent(AbstractTaskEvent event, int inputNumber) throws IOException, InterruptedException; - - - void setIterative(int numEventsUntilEndOfSuperstep); - - - void startNextSuperstep(); - - boolean hasReachedEndOfSuperstep(); -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java deleted file mode 100644 index fc97736..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java +++ /dev/null @@ -1,146 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.io.network.gates.InputChannelResult; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -/** - * A record writer connects an input gate to an application. It allows the application - * query for incoming records and read them from input gate. - * - * @param <T> The type of the record that can be read from this record reader. - */ -public class RecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements Reader<T> { - - private final Class<T> recordType; - - /** - * Stores the last read record. - */ - private T lookahead; - - /** - * Stores if more no more records will be received from the assigned input gate. - */ - private boolean noMoreRecordsWillFollow; - - // -------------------------------------------------------------------------------------------- - - /** - * Constructs a new record reader and registers a new input gate with the application's environment. - * - * @param taskBase - * The application that instantiated the record reader. - * @param recordType - * The class of records that can be read from the record reader. - */ - public RecordReader(AbstractInvokable taskBase, Class<T> recordType) { - super(taskBase); - this.recordType = recordType; - } - - // -------------------------------------------------------------------------------------------- - - /** - * Checks if at least one more record can be read from the associated input gate. This method may block - * until the associated input gate is able to read the record from one of its input channels. - * - * @return <code>true</code>it at least one more record can be read from the associated input gate, otherwise - * <code>false</code> - */ - @Override - public boolean hasNext() throws IOException, InterruptedException{ - if (this.lookahead != null) { - return true; - } else { - if (this.noMoreRecordsWillFollow) { - return false; - } - - T record = instantiateRecordType(); - - while (true) { - InputChannelResult result = this.inputGate.readRecord(record); - switch (result) { - case INTERMEDIATE_RECORD_FROM_BUFFER: - case LAST_RECORD_FROM_BUFFER: - this.lookahead = record; - return true; - - case END_OF_SUPERSTEP: - if (incrementEndOfSuperstepEventAndCheck()) { - return false; - } - else { - break; // fall through and wait for next record/event - } - - case TASK_EVENT: - handleEvent(this.inputGate.getCurrentEvent()); - break; - - case END_OF_STREAM: - this.noMoreRecordsWillFollow = true; - return false; - - default: - ; // fall through the loop - } - } - } - } - - /** - * Reads the current record from the associated input gate. - * - * @return the current record from the associated input gate. - * @throws IOException - * thrown if any error occurs while reading the record from the input gate - */ - @Override - public T next() throws IOException, InterruptedException { - if (hasNext()) { - T tmp = this.lookahead; - this.lookahead = null; - return tmp; - } else { - return null; - } - } - - @Override - public boolean isInputClosed() { - return this.noMoreRecordsWillFollow; - } - - private T instantiateRecordType() { - try { - return this.recordType.newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java deleted file mode 100644 index 3ddd564..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.io.network.Buffer; -import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider; -import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent; -import org.apache.flink.runtime.io.network.serialization.RecordSerializer; -import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; - -/** - * A record writer connects the application to an output gate. It allows the application - * of emit (send out) to the output gate. The output gate will then take care of distributing - * the emitted records among the output channels. - * - * @param <T> - * the type of the record that can be emitted with this record writer - */ -public class RecordWriter<T extends IOReadableWritable> extends BufferWriter { - - private final BufferProvider bufferPool; - - private final ChannelSelector<T> channelSelector; - - private int numChannels; - - /** RecordSerializer per outgoing channel */ - private RecordSerializer<T>[] serializers; - - // ----------------------------------------------------------------------------------------------------------------- - - public RecordWriter(AbstractInvokable invokable) { - this(invokable, new RoundRobinChannelSelector<T>()); - } - - public RecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) { - // initialize the gate - super(invokable); - - this.bufferPool = invokable.getEnvironment().getOutputBufferProvider(); - this.channelSelector = channelSelector; - } - - // ----------------------------------------------------------------------------------------------------------------- - - @SuppressWarnings("unchecked") - public void initializeSerializers() { - this.numChannels = this.outputGate.getNumChannels(); - this.serializers = new RecordSerializer[numChannels]; - for (int i = 0; i < this.numChannels; i++) { - this.serializers[i] = new SpanningRecordSerializer<T>(); - } - } - - public void emit(final T record) throws IOException, InterruptedException { - for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) { - // serialize with corresponding serializer and send full buffer - RecordSerializer<T> serializer = this.serializers[targetChannel]; - - RecordSerializer.SerializationResult result = serializer.addRecord(record); - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - if (buffer != null) { - sendBuffer(buffer, targetChannel); - } - - buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize()); - result = serializer.setNextBuffer(buffer); - } - } - } - - public void flush() throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) { - RecordSerializer<T> serializer = this.serializers[targetChannel]; - - Buffer buffer = serializer.getCurrentBuffer(); - if (buffer != null) { - sendBuffer(buffer, targetChannel); - } - - serializer.clear(); - } - } - - @Override - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) { - RecordSerializer<T> serializer = this.serializers[targetChannel]; - - Buffer buffer = serializer.getCurrentBuffer(); - if (buffer == null) { - super.sendEvent(event, targetChannel); - } else { - super.sendBufferAndEvent(buffer, event, targetChannel); - - buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize()); - serializer.setNextBuffer(buffer); - } - } - } - - @Override - public void sendEndOfSuperstep() throws IOException, InterruptedException { - for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) { - RecordSerializer<T> serializer = this.serializers[targetChannel]; - - Buffer buffer = serializer.getCurrentBuffer(); - if (buffer == null) { - super.sendEvent(EndOfSuperstepEvent.INSTANCE, targetChannel); - } else { - super.sendBufferAndEvent(buffer, EndOfSuperstepEvent.INSTANCE, targetChannel); - - buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize()); - serializer.setNextBuffer(buffer); - } - } - } - - public void clearBuffers() { - if (this.serializers != null) { - for (RecordSerializer<?> s: this.serializers) { - Buffer b = s.getCurrentBuffer(); - if (b != null) { - b.recycleBuffer(); - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java deleted file mode 100644 index 668046f..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import org.apache.flink.core.io.IOReadableWritable; - -/** - * This is the default implementation of the {@link ChannelSelector} interface. It represents a simple round-robin - * strategy, i.e. regardless of the record every attached exactly one output channel is selected at a time. - - * @param <T> - * the type of record which is sent through the attached output gate - */ -public class RoundRobinChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> { - - /** - * Stores the index of the channel to send the next record to. - */ - private final int[] nextChannelToSendTo = new int[1]; - - /** - * Constructs a new default channel selector. - */ - public RoundRobinChannelSelector() { - this.nextChannelToSendTo[0] = 0; - } - - - @Override - public int[] selectChannels(final T record, final int numberOfOutputChannels) { - - this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) % numberOfOutputChannels; - - return this.nextChannelToSendTo; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java deleted file mode 100644 index 2f7ba1d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network.api; - -import java.io.IOException; - -import org.apache.flink.core.io.IOReadableWritable; - -public final class UnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements Reader<T> { - - private final Class<T> recordType; - - private T lookahead; - - - public UnionRecordReader(MutableRecordReader<T>[] recordReaders, Class<T> recordType) { - super(recordReaders); - this.recordType = recordType; - } - - @Override - public boolean hasNext() throws IOException, InterruptedException { - if (this.lookahead != null) { - return true; - } else { - T record = instantiateRecordType(); - if (getNextRecord(record)) { - this.lookahead = record; - return true; - } else { - return false; - } - } - } - - @Override - public T next() throws IOException, InterruptedException { - if (hasNext()) { - T tmp = this.lookahead; - this.lookahead = null; - return tmp; - } else { - return null; - } - } - - private T instantiateRecordType() { - try { - return this.recordType.newInstance(); - } catch (InstantiationException e) { - throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e); - } catch (IllegalAccessException e) { - throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java new file mode 100644 index 0000000..cf4c302 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import java.io.IOException; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.util.event.EventListener; + +/** + * A record-oriented runtime result reader, which wraps a {@link BufferReaderBase}. + * <p> + * This abstract base class is used by both the mutable and immutable record + * reader. + * + * @param <T> The type of the record that can be read with this record reader. + */ +abstract class AbstractRecordReader<T extends IOReadableWritable> implements ReaderBase { + + private final BufferReaderBase reader; + + private final RecordDeserializer<T>[] recordDeserializers; + + private RecordDeserializer<T> currentRecordDeserializer; + + private boolean isFinished; + + protected AbstractRecordReader(BufferReaderBase reader) { + this.reader = reader; + + // Initialize one deserializer per input channel + this.recordDeserializers = new AdaptiveSpanningRecordDeserializer[reader.getNumberOfInputChannels()]; + for (int i = 0; i < recordDeserializers.length; i++) { + recordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T>(); + } + } + + protected boolean getNextRecord(T target) throws IOException, InterruptedException { + if (isFinished) { + return false; + } + + while (true) { + if (currentRecordDeserializer != null) { + DeserializationResult result = currentRecordDeserializer.getNextRecord(target); + + if (result.isBufferConsumed()) { + currentRecordDeserializer.getCurrentBuffer().recycle(); + currentRecordDeserializer = null; + } + + if (result.isFullRecord()) { + return true; + } + } + + final Buffer nextBuffer = reader.getNextBuffer(); + final int channelIndex = reader.getChannelIndexOfLastBuffer(); + + if (nextBuffer == null) { + if (reader.isFinished()) { + isFinished = true; + return false; + } + else if (reader.hasReachedEndOfSuperstep()) { + return false; + } + else { + // More data is coming... + continue; + } + } + + currentRecordDeserializer = recordDeserializers[channelIndex]; + currentRecordDeserializer.setNextBuffer(nextBuffer); + } + } + + public void clearBuffers() { + for (RecordDeserializer<?> deserializer : recordDeserializers) { + Buffer buffer = deserializer.getCurrentBuffer(); + if (buffer != null && !buffer.isRecycled()) { + buffer.recycle(); + } + } + } + + @Override + public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException { + reader.sendTaskEvent(event); + } + + @Override + public boolean isFinished() { + return reader.isFinished(); + } + + @Override + public void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) { + reader.subscribeToTaskEvent(eventListener, eventType); + } + + @Override + public void setIterativeReader() { + reader.setIterativeReader(); + } + + @Override + public void startNextSuperstep() { + reader.startNextSuperstep(); + } + + @Override + public boolean hasReachedEndOfSuperstep() { + return reader.hasReachedEndOfSuperstep(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java new file mode 100644 index 0000000..1df7216 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import com.google.common.collect.Maps; +import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor; +import org.apache.flink.runtime.deployment.PartitionInfo; +import org.apache.flink.runtime.deployment.PartitionInfo.PartitionLocation; +import org.apache.flink.runtime.event.task.AbstractEvent; +import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.execution.RuntimeEnvironment; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.ConnectionManager; +import org.apache.flink.runtime.io.network.NetworkEnvironment; +import org.apache.flink.runtime.io.network.RemoteAddress; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent; +import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent; +import org.apache.flink.runtime.io.network.api.serialization.EventSerializer; +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.BufferProvider; +import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.util.event.EventListener; +import org.apache.flink.runtime.util.event.EventNotificationHandler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +public final class BufferReader implements BufferReaderBase { + + private static final Logger LOG = LoggerFactory.getLogger(BufferReader.class); + + private final Object requestLock = new Object(); + + private final RuntimeEnvironment environment; + + private final NetworkEnvironment networkEnvironment; + + private final EventNotificationHandler<TaskEvent> taskEventHandler = new EventNotificationHandler<TaskEvent>(); + + private final IntermediateDataSetID consumedResultId; + + private final int totalNumberOfInputChannels; + + private final int queueToRequest; + + private final Map<IntermediateResultPartitionID, InputChannel> inputChannels; + + private BufferPool bufferPool; + + private boolean isReleased; + + private boolean isTaskEvent; + + // ------------------------------------------------------------------------ + + private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>(); + + private final AtomicReference<EventListener<BufferReader>> readerListener = new AtomicReference<EventListener<BufferReader>>(null); + + // ------------------------------------------------------------------------ + + private boolean isIterativeReader; + + private int currentNumEndOfSuperstepEvents; + + private int channelIndexOfLastReadBuffer = -1; + + private boolean hasRequestedPartitions = false; + + public BufferReader(RuntimeEnvironment environment, NetworkEnvironment networkEnvironment, IntermediateDataSetID consumedResultId, int numberOfInputChannels, int queueToRequest) { + + this.consumedResultId = checkNotNull(consumedResultId); + // Note: the environment is not fully initialized yet + this.environment = checkNotNull(environment); + + this.networkEnvironment = networkEnvironment; + + checkArgument(numberOfInputChannels >= 0); + this.totalNumberOfInputChannels = numberOfInputChannels; + + checkArgument(queueToRequest >= 0); + this.queueToRequest = queueToRequest; + + this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels); + } + + // ------------------------------------------------------------------------ + // Properties + // ------------------------------------------------------------------------ + + public void setBufferPool(BufferPool bufferPool) { + checkArgument(bufferPool.getNumberOfRequiredMemorySegments() == totalNumberOfInputChannels, "Buffer pool has not enough buffers for this reader."); + checkState(this.bufferPool == null, "Buffer pool has already been set for reader."); + + this.bufferPool = checkNotNull(bufferPool); + } + + public IntermediateDataSetID getConsumedResultId() { + return consumedResultId; + } + + public String getTaskNameWithSubtasks() { + return environment.getTaskNameWithSubtasks(); + } + + public IntermediateResultPartitionProvider getIntermediateResultPartitionProvider() { + return networkEnvironment.getPartitionManager(); + } + + public TaskEventDispatcher getTaskEventDispatcher() { + return networkEnvironment.getTaskEventDispatcher(); + } + + public ConnectionManager getConnectionManager() { + return networkEnvironment.getConnectionManager(); + } + + // TODO This is a work-around for the union reader + boolean hasInputChannelWithData() { + return !inputChannelsWithData.isEmpty(); + } + + /** + * Returns the total number of input channels for this reader. + * <p> + * Note: This number might be smaller the current number of input channels + * of the reader as channels are possibly updated during runtime. + */ + public int getNumberOfInputChannels() { + return totalNumberOfInputChannels; + } + + public BufferProvider getBufferProvider() { + return bufferPool; + } + + public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) { + synchronized (requestLock) { + inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel)); + } + } + + public void updateInputChannel(PartitionInfo partitionInfo) throws IOException { + synchronized (requestLock) { + if (isReleased) { + // There was a race with a task failure/cancel + return; + } + + final IntermediateResultPartitionID partitionId = partitionInfo.getPartitionId(); + + InputChannel current = inputChannels.get(partitionId); + + if (current.getClass() == UnknownInputChannel.class) { + UnknownInputChannel unknownChannel = (UnknownInputChannel) current; + + InputChannel newChannel; + + if (partitionInfo.getProducerLocation() == PartitionLocation.REMOTE) { + newChannel = unknownChannel.toRemoteInputChannel(partitionInfo.getProducerAddress()); + } + else if (partitionInfo.getProducerLocation() == PartitionLocation.LOCAL) { + newChannel = unknownChannel.toLocalInputChannel(); + } + else { + throw new IllegalStateException("Tried to update unknown channel with unknown channel."); + } + + inputChannels.put(partitionId, newChannel); + + newChannel.requestIntermediateResultPartition(queueToRequest); + } + } + } + + // ------------------------------------------------------------------------ + // Consume + // ------------------------------------------------------------------------ + + void requestPartitionsOnce() throws IOException { + if (!hasRequestedPartitions) { + // Sanity check + if (totalNumberOfInputChannels != inputChannels.size()) { + throw new IllegalStateException("Mismatch between number of total input channels and the currently number of set input channels."); + } + + synchronized (requestLock) { + for (InputChannel inputChannel : inputChannels.values()) { + inputChannel.requestIntermediateResultPartition(queueToRequest); + } + } + + hasRequestedPartitions = true; + } + } + + @Override + public Buffer getNextBuffer() throws IOException, InterruptedException { + requestPartitionsOnce(); + + while (true) { + if (Thread.interrupted()) { + throw new InterruptedException(); + } + + // Possibly block until data is available at one of the input channels + InputChannel currentChannel = null; + while (currentChannel == null) { + currentChannel = inputChannelsWithData.poll(2000, TimeUnit.MILLISECONDS); + } + + isTaskEvent = false; + + final Buffer buffer = currentChannel.getNextBuffer(); + + if (buffer == null) { + throw new IllegalStateException("Bug in reader logic: queried for a buffer although none was available."); + } + + if (buffer.isBuffer()) { + channelIndexOfLastReadBuffer = currentChannel.getChannelIndex(); + return buffer; + } + else { + try { + final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader()); + + // ------------------------------------------------------------ + // Runtime events + // ------------------------------------------------------------ + // Note: We can not assume that every channel will be finished + // with an according event. In failure cases or iterations the + // consumer task finishes earlier and has to release all + // resources. + // ------------------------------------------------------------ + if (event.getClass() == EndOfPartitionEvent.class) { + currentChannel.releaseAllResources(); + + return null; + } + else if (event.getClass() == EndOfSuperstepEvent.class) { + incrementEndOfSuperstepEventAndCheck(); + + return null; + } + // ------------------------------------------------------------ + // Task events (user) + // ------------------------------------------------------------ + else if (event instanceof TaskEvent) { + taskEventHandler.publish((TaskEvent) event); + + isTaskEvent = true; + + return null; + } + else { + throw new IllegalStateException("Received unexpected event " + event + " from input channel " + currentChannel + "."); + } + } + catch (Throwable t) { + throw new IOException("Error while reading event: " + t.getMessage(), t); + } + finally { + buffer.recycle(); + } + } + } + } + + @Override + public Buffer getNextBuffer(Buffer exchangeBuffer) { + throw new UnsupportedOperationException("Buffer exchange when reading data is not yet supported."); + } + + @Override + public int getChannelIndexOfLastBuffer() { + return channelIndexOfLastReadBuffer; + } + + @Override + public boolean isTaskEvent() { + return isTaskEvent; + } + + @Override + public boolean isFinished() { + synchronized (requestLock) { + for (InputChannel inputChannel : inputChannels.values()) { + if (!inputChannel.isReleased()) { + return false; + } + } + } + + return true; + } + + public void releaseAllResources() throws IOException { + synchronized (requestLock) { + if (!isReleased) { + try { + for (InputChannel inputChannel : inputChannels.values()) { + try { + inputChannel.releaseAllResources(); + } + catch (IOException e) { + LOG.warn("Error during release of channel resources: " + e.getMessage(), e); + } + } + + // The buffer pool can actually be destroyed immediately after the + // reader received all of the data from the input channels. + if (bufferPool != null) { + bufferPool.destroy(); + } + } + finally { + isReleased = true; + } + } + } + } + + // ------------------------------------------------------------------------ + // Channel notifications + // ------------------------------------------------------------------------ + + public void onAvailableInputChannel(InputChannel inputChannel) { + inputChannelsWithData.add(inputChannel); + + if (readerListener.get() != null) { + readerListener.get().onEvent(this); + } + } + + void subscribeToReader(EventListener<BufferReader> listener) { + if (!this.readerListener.compareAndSet(null, listener)) { + throw new IllegalStateException(listener + " is already registered as a record availability listener"); + } + } + + // ------------------------------------------------------------------------ + // Task events + // ------------------------------------------------------------------------ + + @Override + public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException { + // This can be improved by just serializing the event once for all + // remote input channels. + synchronized (requestLock) { + for (InputChannel inputChannel : inputChannels.values()) { + inputChannel.sendTaskEvent(event); + } + } + } + + @Override + public void subscribeToTaskEvent(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) { + taskEventHandler.subscribe(listener, eventType); + } + + // ------------------------------------------------------------------------ + // Iteration end of superstep events + // ------------------------------------------------------------------------ + + @Override + public void setIterativeReader() { + isIterativeReader = true; + } + + @Override + public void startNextSuperstep() { + checkState(isIterativeReader, "Tried to start next superstep in a non-iterative reader."); + checkState(currentNumEndOfSuperstepEvents == totalNumberOfInputChannels, + "Tried to start next superstep before reaching end of previous superstep."); + + currentNumEndOfSuperstepEvents = 0; + } + + @Override + public boolean hasReachedEndOfSuperstep() { + return currentNumEndOfSuperstepEvents == totalNumberOfInputChannels; + } + + private boolean incrementEndOfSuperstepEventAndCheck() { + checkState(isIterativeReader, "Received end of superstep event in a non-iterative reader."); + + currentNumEndOfSuperstepEvents++; + + checkState(currentNumEndOfSuperstepEvents <= totalNumberOfInputChannels, + "Received too many (" + currentNumEndOfSuperstepEvents + ") end of superstep events."); + + return currentNumEndOfSuperstepEvents == totalNumberOfInputChannels; + } + + // ------------------------------------------------------------------------ + + @Override + public String toString() { + return String.format("BufferReader %s [task: %s, current/total number of input channels: %d/%d]", + consumedResultId, getTaskNameWithSubtasks(), inputChannels.size(), totalNumberOfInputChannels); + } + + public static BufferReader create(RuntimeEnvironment runtimeEnvironment, NetworkEnvironment networkEnvironment, PartitionConsumerDeploymentDescriptor desc) { + // The consumed intermediate data set (all partitions are part of this data set) + final IntermediateDataSetID resultId = desc.getResultId(); + + // The queue to request from each consumed partition + final int queueIndex = desc.getQueueIndex(); + + // There is one input channel for each consumed partition + final PartitionInfo[] partitions = desc.getPartitions(); + final int numberOfInputChannels = partitions.length; + + final BufferReader reader = new BufferReader(runtimeEnvironment, networkEnvironment, resultId, numberOfInputChannels, queueIndex); + + // Create input channels + final InputChannel[] inputChannels = new InputChannel[numberOfInputChannels]; + + int channelIndex = 0; + + for (PartitionInfo partition : partitions) { + final ExecutionAttemptID producerExecutionId = partition.getProducerExecutionId(); + final IntermediateResultPartitionID partitionId = partition.getPartitionId(); + + final PartitionLocation producerLocation = partition.getProducerLocation(); + + switch (producerLocation) { + case LOCAL: + inputChannels[channelIndex] = new LocalInputChannel(channelIndex, producerExecutionId, partitionId, reader); + break; + + case REMOTE: + final RemoteAddress producerAddress = checkNotNull(partition.getProducerAddress(), "Missing producer address for remote intermediate result partition."); + + inputChannels[channelIndex] = new RemoteInputChannel(channelIndex, producerExecutionId, partitionId, reader, producerAddress); + break; + + case UNKNOWN: + inputChannels[channelIndex] = new UnknownInputChannel(channelIndex, producerExecutionId, partitionId, reader); + break; + } + + reader.setInputChannel(partitionId, inputChannels[channelIndex]); + + channelIndex++; + } + + return reader; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java new file mode 100644 index 0000000..04fae71 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.partition.consumer.InputChannel; +import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel; + +import java.io.IOException; + +/** + * A buffer-oriented runtime result reader. + * <p> + * {@link BufferReaderBase} is the runtime API for consuming results. Events + * are handled by the reader and users can query for buffers with + * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}. + * <p> + * <strong>Important</strong>: If {@link #getNextBuffer()} is used, it is + * necessary to release the returned buffers with {@link Buffer#recycle()} + * after they are consumed. + */ +public interface BufferReaderBase extends ReaderBase { + + /** + * Returns the next queued {@link Buffer} from one of the {@link RemoteInputChannel} + * instances attached to this reader. The are no ordering guarantees with + * respect to which channel is queried for data. + * <p> + * <strong>Important</strong>: it is necessary to release buffers, which + * are returned by the reader via {@link Buffer#recycle()}, because they + * are a pooled resource. If not recycled, the network stack will run out + * of buffers and deadlock. + * + * @see #getChannelIndexOfLastBuffer() + */ + Buffer getNextBuffer() throws IOException, InterruptedException; + + /** + * {@link #getNextBuffer()} requires the user to quickly recycle the + * returned buffer. For a fully buffer-oriented runtime, we need to + * support a variant of this method, which allows buffers to be exchanged + * in order to save unnecessary memory copies between buffer pools. + * <p> + * Currently this is not a problem, because the only "users" of the buffer- + * oriented API are the record-oriented readers, which immediately + * deserialize the buffer and recycle it. + */ + Buffer getNextBuffer(Buffer exchangeBuffer) throws IOException, InterruptedException; + + /** + * Returns a channel index for the last {@link Buffer} instance returned by + * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}. + * <p> + * The returned index is guaranteed to be the same for all buffers read by + * the same {@link RemoteInputChannel} instance. This is useful when data spans + * multiple buffers returned by this reader. + * <p> + * Initially returns <code>-1</code> and if multiple readers are unioned, + * the local channel indexes are mapped to the sequence from 0 to n-1. + */ + int getChannelIndexOfLastBuffer(); + + /** + * Returns the total number of {@link InputChannel} instances, from which this + * reader gets its data. + */ + int getNumberOfInputChannels(); + + boolean isTaskEvent(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java new file mode 100644 index 0000000..e47982e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import java.io.IOException; + +import org.apache.flink.core.io.IOReadableWritable; + +/** + * A record-oriented reader for mutable record types. + */ +public interface MutableReader<T extends IOReadableWritable> extends ReaderBase { + + boolean next(T target) throws IOException, InterruptedException; + + void clearBuffers(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java new file mode 100644 index 0000000..75d4f21 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import org.apache.flink.core.io.IOReadableWritable; + +import java.io.IOException; + +public class MutableRecordReader<T extends IOReadableWritable> extends AbstractRecordReader<T> implements MutableReader<T> { + + public MutableRecordReader(BufferReaderBase reader) { + super(reader); + } + + @Override + public boolean next(final T target) throws IOException, InterruptedException { + return getNextRecord(target); + } + + @Override + public void clearBuffers() { + super.clearBuffers(); + } +}