[14/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java new file mode 100644 index 000..7a529b9 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import com.google.common.collect.HashBasedTable; +import com.google.common.collect.Table; +import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.api.writer.BufferWriter; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.util.event.EventListener; + +import java.util.ArrayList; +import java.util.List; + +/** + * The task event dispatcher dispatches events flowing backwards from a consumer + * to a producer. It only supports programs, where the producer and consumer + * are running at the same time. + * p + * The publish method is either called from the local input channel or the + * network I/O thread. + */ +public class TaskEventDispatcher { + + TableExecutionAttemptID, IntermediateResultPartitionID, BufferWriter registeredWriters = HashBasedTable.create(); + + public void registerWriterForIncomingTaskEvents(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, BufferWriter listener) { + synchronized (registeredWriters) { + if (registeredWriters.put(executionId, partitionId, listener) != null) { + throw new IllegalStateException(Event dispatcher already contains buffer writer.); + } + } + } + + public void unregisterWriters(ExecutionAttemptID executionId) { + synchronized (registeredWriters) { + ListIntermediateResultPartitionID writersToUnregister = new ArrayListIntermediateResultPartitionID(); + + for (IntermediateResultPartitionID partitionId : registeredWriters.row(executionId).keySet()) { + writersToUnregister.add(partitionId); + } + + for(IntermediateResultPartitionID partitionId : writersToUnregister) { + registeredWriters.remove(executionId, partitionId); + } + } + } + + /** +* Publishes the event to the registered {@link EventListener} instance. +* p +* This method is either called from a local input channel or the network +* I/O thread on behalf of a remote input channel. +*/ + public boolean publish(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, TaskEvent event) { + EventListenerTaskEvent listener = registeredWriters.get(executionId, partitionId); + + if (listener != null) { + listener.onEvent(event); + return true; + } + + return false; + } + + int getNumberOfRegisteredWriters() { + synchronized (registeredWriters) { + return registeredWriters.size(); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java deleted file mode 100644 index 3af9aef..000 ---
[13/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java new file mode 100644 index 000..7255390 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import java.io.IOException; + +import org.apache.flink.core.io.IOReadableWritable; + +/** + * A record-oriented reader for immutable record types. + */ +public interface ReaderT extends IOReadableWritable extends ReaderBase { + + boolean hasNext() throws IOException, InterruptedException; + + T next() throws IOException, InterruptedException; + + void clearBuffers(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java new file mode 100644 index 000..bb6ec44 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.api.reader; + +import java.io.IOException; + +import org.apache.flink.runtime.event.task.TaskEvent; +import org.apache.flink.runtime.util.event.EventListener; + +/** + * The basic API every reader (both buffer- and record-oriented) has to support. + */ +public interface ReaderBase { + + // + // Properties + // + + boolean isFinished(); + + // + // Events + // + + void subscribeToTaskEvent(EventListenerTaskEvent eventListener, Class? extends TaskEvent eventType); + + void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException; + + // + // Iterations + // + + void setIterativeReader(); + + void startNextSuperstep(); + + boolean hasReachedEndOfSuperstep(); + +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java new file mode 100644 index
[06/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java index 95e6da5..c7cde92 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java @@ -89,11 +89,11 @@ public class ExecutionGraphConstructionTest { v4.setParallelism(11); v5.setParallelism(4); - v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE); - v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE); - v4.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE); - v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE); - v5.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); + v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL); + v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); + v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL); ListAbstractJobVertex ordered = new ArrayListAbstractJobVertex(Arrays.asList(v1, v2, v3, v4, v5)); @@ -125,7 +125,7 @@ public class ExecutionGraphConstructionTest { v3.setParallelism(2); // this creates an intermediate result for v1 - v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); // create results for v2 and v3 IntermediateDataSet v2result = v2.createAndAddResultDataSet(); @@ -151,10 +151,10 @@ public class ExecutionGraphConstructionTest { v4.setParallelism(11); v5.setParallelism(4); - v4.connectDataSetAsInput(v2result, DistributionPattern.BIPARTITE); - v4.connectDataSetAsInput(v3result_1, DistributionPattern.BIPARTITE); - v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE); - v5.connectDataSetAsInput(v3result_2, DistributionPattern.BIPARTITE); + v4.connectDataSetAsInput(v2result, DistributionPattern.ALL_TO_ALL); + v4.connectDataSetAsInput(v3result_1, DistributionPattern.ALL_TO_ALL); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); + v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL); ListAbstractJobVertex ordered2 = new ArrayListAbstractJobVertex(Arrays.asList(v4, v5)); @@ -186,7 +186,7 @@ public class ExecutionGraphConstructionTest { v3.setParallelism(2); // this creates an intermediate result for v1 - v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE); + v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL); // create results for v2 and v3 IntermediateDataSet v2result = v2.createAndAddResultDataSet(); @@ -212,10 +212,10 @@ public class ExecutionGraphConstructionTest { v4.setParallelism(11); v5.setParallelism(4); - v4.connectIdInput(v2result.getId(), DistributionPattern.BIPARTITE); - v4.connectIdInput(v3result_1.getId(), DistributionPattern.BIPARTITE); - v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE); - v5.connectIdInput(v3result_2.getId(), DistributionPattern.BIPARTITE); + v4.connectIdInput(v2result.getId(), DistributionPattern.ALL_TO_ALL); + v4.connectIdInput(v3result_1.getId(), DistributionPattern.ALL_TO_ALL); + v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL); + v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL); ListAbstractJobVertex ordered2 = new ArrayListAbstractJobVertex(Arrays.asList(v4, v5)); @@ -298,7 +298,7 @@ public class ExecutionGraphConstructionTest { int sumOfPartitions = 0; for (ExecutionEdge inEdge : inputs) {
[11/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java deleted file mode 100644 index 3bca0a4..000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java +++ /dev/null @@ -1,384 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an AS IS BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network.gates; - -import java.io.IOException; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicReference; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor; -import org.apache.flink.runtime.deployment.GateDeploymentDescriptor; -import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.event.task.AbstractTaskEvent; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.io.network.Buffer; -import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener; -import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider; -import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool; -import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPool; -import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner; -import org.apache.flink.runtime.io.network.channels.InputChannel; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.JobID; - -/** - * Input gates are a specialization of general gates and connect input channels and record readers. As - * channels, input gates are always parameterized to a specific type of record which they can transport. In contrast to - * output gates input gates can be associated with a {@link DistributionPattern} object which dictates the concrete - * wiring between two groups of vertices. - * - * @param T The type of record that can be transported through this gate. - */ -public class InputGateT extends IOReadableWritable extends GateT implements BufferProvider, LocalBufferPoolOwner { - - /** -* The log object used for debugging. -*/ - private static final Logger LOG = LoggerFactory.getLogger(InputGate.class); - - /** -* The array of input channels attached to this input gate. -*/ - private InputChannelT[] channels; - - /** -* Queue with indices of channels that store at least one available record. -*/ - private final BlockingQueueInteger availableChannels = new LinkedBlockingQueueInteger(); - - /** -* The listener object to be notified when a channel has at least one record available. -*/ - private final AtomicReferenceRecordAvailabilityListenerT recordAvailabilityListener = new AtomicReferenceRecordAvailabilityListenerT(null); - - - private AbstractTaskEvent currentEvent; - - /** -* If the value of this variable is set to codetrue/code, the input gate is closed. -*/ - private boolean isClosed = false; - - /** -* The channel to read from next. -*/ - private int channelToReadFrom = -1; - - private LocalBufferPool bufferPool; - - /** -* Constructs a new runtime input gate. -* -* @param jobID -*the ID of the job this input gate belongs to -* @param gateID -*the ID of the gate -* @param index -*the index assigned to this input gate at the {@link Environment} object -*/ - public InputGate(final JobID jobID, final GateID gateID, final int index) { - super(jobID, gateID, index); - } - -
[15/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index 1f79067..7de8651 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -18,15 +18,15 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.util.EnvironmentInformation; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.util.EnvironmentInformation; - -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; /** * A version of the {@link IOManager} that uses asynchronous I/O. @@ -181,13 +181,13 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID, LinkedBlockingQueueMemorySegment returnQueue) throws IOException { - Preconditions.checkState(!shutdown, I/O-Manger is closed.); + checkState(!shutdown, I/O-Manger is closed.); return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue); } @Override - public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException { - Preconditions.checkState(!shutdown, I/O-Manger is closed.); + public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallbackMemorySegment callback) throws IOException { + checkState(!shutdown, I/O-Manger is closed.); return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback); } @@ -205,7 +205,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, LinkedBlockingQueueMemorySegment returnQueue) throws IOException { - Preconditions.checkState(!shutdown, I/O-Manger is closed.); + checkState(!shutdown, I/O-Manger is closed.); return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue); } @@ -228,7 +228,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, ListMemorySegment targetSegments, int numBlocks) throws IOException { - Preconditions.checkState(!shutdown, I/O-Manger is closed.); + checkState(!shutdown, I/O-Manger is closed.); return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks); } http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java index 1b7adae..69791f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java @@ -33,7 +33,6 @@ interface IORequest { public void requestDone(IOException ioex); } - /** * Interface for I/O requests that are handled by the IOManager's reading thread. */ @@ -47,7 +46,6 @@ interface ReadRequest extends IORequest { public void read() throws IOException; } - /** * Interface for I/O requests that are handled by the IOManager's writing thread. */ http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
[18/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
[FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results This closes #254. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d908ca19 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d908ca19 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d908ca19 Branch: refs/heads/master Commit: d908ca19741bf2561cb6a7663541f642e60c0e6d Parents: c8c50be Author: Ufuk Celebi u...@apache.org Authored: Wed Oct 8 11:40:31 2014 +0200 Committer: Ufuk Celebi u...@apache.org Committed: Mon Jan 12 09:15:12 2015 +0100 -- .../flink/streaming/api/JobGraphBuilder.java| 2 +- .../api/collector/DirectedStreamCollector.java | 2 +- .../api/collector/StreamCollector.java | 2 +- .../api/streamvertex/CoStreamVertex.java| 83 +- .../api/streamvertex/InputHandler.java | 16 +- .../api/streamvertex/OutputHandler.java | 22 +- .../api/streamvertex/StreamIterationHead.java | 4 +- .../api/streamvertex/StreamVertex.java | 3 +- .../streamvertex/StreamingRuntimeContext.java | 2 +- .../flink/streaming/io/CoRecordReader.java | 223 ++--- .../flink/streaming/io/StreamRecordWriter.java | 117 +-- .../partitioner/StreamPartitioner.java | 2 +- .../api/collector/DirectedOutputTest.java | 22 +- .../streaming/api/streamrecord/UIDTest.java | 4 +- .../api/streamvertex/MockRecordWriter.java | 4 +- .../plantranslate/NepheleJobGraphGenerator.java | 4 +- .../compiler/BranchingPlansCompilerTest.java| 12 +- .../apache/flink/core/memory/MemorySegment.java | 3 +- .../org/apache/flink/types/BooleanValue.java| 3 +- .../examples/java/wordcount/WordCount.java | 4 +- .../java/org/apache/flink/api/java/DataSet.java | 12 +- flink-runtime/pom.xml | 2 +- .../broadcast/BroadcastVariableManager.java | 2 +- .../BroadcastVariableMaterialization.java | 6 +- .../deployment/ChannelDeploymentDescriptor.java | 86 -- .../deployment/GateDeploymentDescriptor.java| 80 -- .../PartitionConsumerDeploymentDescriptor.java | 94 ++ .../PartitionDeploymentDescriptor.java | 116 +++ .../flink/runtime/deployment/PartitionInfo.java | 159 .../deployment/TaskDeploymentDescriptor.java| 159 ++-- .../runtime/event/task/AbstractTaskEvent.java | 29 - .../flink/runtime/event/task/EventListener.java | 36 - .../event/task/EventNotificationManager.java| 108 --- .../runtime/event/task/IntegerTaskEvent.java| 2 +- .../flink/runtime/event/task/RuntimeEvent.java | 22 + .../runtime/event/task/StringTaskEvent.java | 2 +- .../flink/runtime/event/task/TaskEvent.java | 22 + .../flink/runtime/execution/Environment.java| 147 +--- .../runtime/execution/RuntimeEnvironment.java | 732 .../flink/runtime/executiongraph/Execution.java | 254 -- .../executiongraph/ExecutionAttemptID.java | 16 +- .../runtime/executiongraph/ExecutionEdge.java | 34 +- .../runtime/executiongraph/ExecutionGraph.java | 311 ++- .../runtime/executiongraph/ExecutionVertex.java | 181 ++-- .../executiongraph/IntermediateResult.java | 52 +- .../IntermediateResultPartition.java| 55 +- .../disk/iomanager/AsynchronousBlockReader.java | 17 +- .../AsynchronousBlockWriterWithCallback.java| 17 +- .../iomanager/AsynchronousBulkBlockReader.java | 17 +- .../iomanager/AsynchronousFileIOChannel.java| 55 +- .../runtime/io/disk/iomanager/IOManager.java| 73 +- .../io/disk/iomanager/IOManagerAsync.java | 18 +- .../runtime/io/disk/iomanager/IORequest.java| 2 - .../io/disk/iomanager/QueuingCallback.java | 2 +- .../io/disk/iomanager/RequestDoneCallback.java | 16 +- .../apache/flink/runtime/io/network/Buffer.java | 98 --- .../runtime/io/network/BufferRecycler.java | 32 - .../runtime/io/network/ChannelManager.java | 692 --- .../network/ConnectionInfoLookupResponse.java | 111 --- .../runtime/io/network/ConnectionManager.java | 42 + .../flink/runtime/io/network/Envelope.java | 183 .../runtime/io/network/EnvelopeDispatcher.java | 52 -- .../io/network/EnvelopeReceiverList.java| 81 -- .../network/InsufficientResourcesException.java | 43 - .../io/network/LocalConnectionManager.java | 19 +- .../LocalReceiverCancelledException.java| 41 - .../io/network/NetworkConnectionManager.java| 35 - .../runtime/io/network/NetworkEnvironment.java | 277 ++ .../flink/runtime/io/network/RemoteAddress.java | 122 +++ .../runtime/io/network/RemoteReceiver.java | 150 .../runtime/io/network/SenderHintEvent.java | 122 --- .../runtime/io/network/TaskEventDispatcher.java | 88 ++ .../io/network/api/AbstractRecordReader.java| 104 ---
flink git commit: Undo erroneous commit 86f87537
Repository: flink Updated Branches: refs/heads/master 9f07373f3 - aba76171f Undo erroneous commit 86f87537 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aba76171 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aba76171 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aba76171 Branch: refs/heads/master Commit: aba76171fef41e2c987913c32fefafc55ef635f6 Parents: 9f07373 Author: Robert Metzger rmetz...@apache.org Authored: Mon Jan 12 13:26:57 2015 +0100 Committer: Robert Metzger rmetz...@apache.org Committed: Mon Jan 12 13:26:57 2015 +0100 -- flink-quickstart/quickstart-scala.sh | 2 +- flink-quickstart/quickstart.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/aba76171/flink-quickstart/quickstart-scala.sh -- diff --git a/flink-quickstart/quickstart-scala.sh b/flink-quickstart/quickstart-scala.sh index 019f6b0..0690f3c 100755 --- a/flink-quickstart/quickstart-scala.sh +++ b/flink-quickstart/quickstart-scala.sh @@ -24,7 +24,7 @@ PACKAGE=quickstart mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-scala \ - -DarchetypeVersion=0.8.0 \ + -DarchetypeVersion=0.7.0-incubating \ -DgroupId=org.apache.flink \ -DartifactId=$PACKAGE \ -Dversion=0.1 \ http://git-wip-us.apache.org/repos/asf/flink/blob/aba76171/flink-quickstart/quickstart.sh -- diff --git a/flink-quickstart/quickstart.sh b/flink-quickstart/quickstart.sh index 92309d4..568ccc6 100755 --- a/flink-quickstart/quickstart.sh +++ b/flink-quickstart/quickstart.sh @@ -24,7 +24,7 @@ PACKAGE=quickstart mvn archetype:generate \ -DarchetypeGroupId=org.apache.flink \ -DarchetypeArtifactId=flink-quickstart-java \ - -DarchetypeVersion=0.8.0 \ + -DarchetypeVersion=0.7.0-incubating \ -DgroupId=org.apache.flink \ -DartifactId=$PACKAGE \ -Dversion=0.1 \
[5/5] flink git commit: [dist] Updated mailing list info in binary distribution
[dist] Updated mailing list info in binary distribution Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4e9db09 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4e9db09 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4e9db09 Branch: refs/heads/release-0.8 Commit: b4e9db091ee95348dc85a27fee87d27edc4dde45 Parents: 4f95ce7 Author: mbalassi mbala...@apache.org Authored: Mon Jan 12 10:50:55 2015 +0100 Committer: mbalassi mbala...@apache.org Committed: Mon Jan 12 10:50:55 2015 +0100 -- flink-dist/src/main/flink-bin/README.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b4e9db09/flink-dist/src/main/flink-bin/README.txt -- diff --git a/flink-dist/src/main/flink-bin/README.txt b/flink-dist/src/main/flink-bin/README.txt index 46fab4d..4ebee19 100644 --- a/flink-dist/src/main/flink-bin/README.txt +++ b/flink-dist/src/main/flink-bin/README.txt @@ -8,8 +8,8 @@ and our GitHub Account If you have any questions, ask on our Mailing lists: - u...@flink.incubator.apache.org - d...@flink.incubator.apache.org + u...@flink.apache.org + d...@flink.apache.org This distribution includes cryptographic software. The country in which you currently reside may have restrictions on the import,
[1/5] flink git commit: [docs] Prepare documentation for 0.8 release
Repository: flink Updated Branches: refs/heads/release-0.8 ada35eb05 - b4e9db091 [docs] Prepare documentation for 0.8 release Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cd0f477 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cd0f477 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cd0f477 Branch: refs/heads/release-0.8 Commit: 7cd0f47787d37add0ae6b87b7dd0dd454dfb04cb Parents: ada35eb Author: Robert Metzger metzg...@web.de Authored: Sat Jan 10 12:21:56 2015 +0100 Committer: mbalassi mbala...@apache.org Committed: Mon Jan 12 09:57:32 2015 +0100 -- docs/README.md | 2 +- docs/_config.yml | 10 +- docs/cli.md | 12 ++-- docs/cluster_execution.md| 4 ++-- docs/example_connectors.md | 8 docs/hadoop_compatibility.md | 2 +- docs/index.md| 4 ++-- docs/java_api_quickstart.md | 2 +- docs/local_execution.md | 2 +- docs/programming_guide.md| 12 ++-- docs/scala_api_quickstart.md | 2 +- docs/setup_quickstart.md | 8 docs/spargel_guide.md| 2 +- docs/streaming_guide.md | 2 +- docs/yarn_setup.md | 14 +++--- 15 files changed, 43 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7cd0f477/docs/README.md -- diff --git a/docs/README.md b/docs/README.md index 8a19034..6dc76f2 100644 --- a/docs/README.md +++ b/docs/README.md @@ -44,7 +44,7 @@ title of the page. This title is used as the top-level heading for the page. Furthermore, you can access variables found in `docs/_config.yml` as follows: -{{ site.FLINK_VERSION_STABLE }} +{{ site.FLINK_VERSION_SHORT }} This will be replaced with the value of the variable when generating the docs. http://git-wip-us.apache.org/repos/asf/flink/blob/7cd0f477/docs/_config.yml -- diff --git a/docs/_config.yml b/docs/_config.yml index f9e3690..0fccf8c 100644 --- a/docs/_config.yml +++ b/docs/_config.yml @@ -22,9 +22,9 @@ # {{ site.CONFIG_KEY }} #-- -FLINK_VERSION_STABLE: 0.7.0-incubating # this variable can point to a SNAPSHOT version in the git source. -FLINK_VERSION_SHORT: 0.7.0 -FLINK_VERSION_HADOOP_2_STABLE: 0.7-hadoop2-incubating +FLINK_VERSION_HADOOP1_STABLE: 0.8.0-hadoop1 # this variable can point to a SNAPSHOT version in the git source. +FLINK_VERSION_SHORT: 0.8.0 +FLINK_VERSION_HADOOP2_STABLE: 0.8.0 FLINK_SCALA_VERSION: 2.10.4 FLINK_SCALA_VERSION_SHORT: 2.10 FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK @@ -33,8 +33,8 @@ FLINK_GITHUB_URL: https://github.com/apache/incubator-flink FLINK_WEBSITE_URL: http://flink.incubator.apache.org FLINK_DOWNLOAD_URL: http://flink.incubator.apache.org/downloads.html -FLINK_DOWNLOAD_URL_HADOOP_1_STABLE: http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop1.tgz -FLINK_DOWNLOAD_URL_HADOOP_2_STABLE: http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2.tgz +FLINK_DOWNLOAD_URL_HADOOP1_STABLE: http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop1.tgz +FLINK_DOWNLOAD_URL_HADOOP2_STABLE: http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2.tgz FLINK_DOWNLOAD_URL_YARN_STABLE: http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2-yarn.tgz FLINK_WGET_URL_YARN_STABLE: http://artfiles.org/apache.org/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2-yarn.tgz http://git-wip-us.apache.org/repos/asf/flink/blob/7cd0f477/docs/cli.md -- diff --git a/docs/cli.md b/docs/cli.md index 7db31a6..ceab6e4 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -46,33 +46,33 @@ The command line can be used to - Run example program with no arguments. -./bin/flink run ./examples/flink-java-examples-{{ site.FLINK_VERSION_STABLE }}-WordCount.jar +./bin/flink run ./examples/flink-java-examples-{{ site.FLINK_VERSION_SHORT }}-WordCount.jar - Run example program with arguments for input and result files -./bin/flink run ./examples/flink-java-examples-{{ site.FLINK_VERSION_STABLE }}-WordCount.jar \ +./bin/flink run ./examples/flink-java-examples-{{ site.FLINK_VERSION_SHORT }}-WordCount.jar \ file:///home/user/hamlet.txt
[3/5] flink git commit: [FLINK-1385] Print warning if not resources are _currently_ available in the YARN cluster. Instead of rejecting the session
[FLINK-1385] Print warning if not resources are _currently_ available in the YARN cluster. Instead of rejecting the session This closes #294 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e83ccd03 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e83ccd03 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e83ccd03 Branch: refs/heads/release-0.8 Commit: e83ccd03cde884626588d729f6dc9e4d624c9faf Parents: 7cd0f47 Author: Robert Metzger metzg...@web.de Authored: Sat Jan 10 18:20:10 2015 +0100 Committer: mbalassi mbala...@apache.org Committed: Mon Jan 12 09:57:33 2015 +0100 -- .../main/java/org/apache/flink/yarn/Client.java | 21 +--- 1 file changed, 9 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e83ccd03/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java -- diff --git a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java index 161aa8a..2100df9 100644 --- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java +++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java @@ -394,25 +394,22 @@ public class Client { yarnClient.stop(); System.exit(1); } + + final String NOTE = \nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are + + connecting from the beginning. The allocation might take more time than usual!; int totalMemoryRequired = jmMemory + tmMemory * taskManagerCount; ClusterResourceDescription freeClusterMem = getCurrentFreeClusterResources(yarnClient); if(freeClusterMem.totalFreeMemory totalMemoryRequired) { - LOG.error(This YARN session requires +totalMemoryRequired+MB of memory in the cluster. - + There are currently only +freeClusterMem.totalFreeMemory+MB available.); - yarnClient.stop(); - System.exit(1); + LOG.warn(This YARN session requires + totalMemoryRequired + MB of memory in the cluster. + + There are currently only + freeClusterMem.totalFreeMemory + MB available. + NOTE); } if( tmMemory freeClusterMem.containerLimit) { - LOG.error(The requested amount of memory for the TaskManagers (+tmMemory+MB) is more than - + the largest possible YARN container: +freeClusterMem.containerLimit); - yarnClient.stop(); - System.exit(1); + LOG.warn(The requested amount of memory for the TaskManagers ( + tmMemory + MB) is more than + + currently the largest possible YARN container: + freeClusterMem.containerLimit + NOTE); } if( jmMemory freeClusterMem.containerLimit) { - LOG.error(The requested amount of memory for the JobManager (+jmMemory+MB) is more than - + the largest possible YARN container: +freeClusterMem.containerLimit); - yarnClient.stop(); - System.exit(1); + LOG.warn(The requested amount of memory for the JobManager ( + jmMemory + MB) is more than + + currently the largest possible YARN container: + freeClusterMem.containerLimit + NOTE); } // respect custom JVM options in the YAML file