[14/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results

2015-01-12 Thread uce
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
new file mode 100644
index 000..7a529b9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The task event dispatcher dispatches events flowing backwards from a 
consumer
+ * to a producer. It only supports programs, where the producer and consumer
+ * are running at the same time.
+ * p
+ * The publish method is either called from the local input channel or the
+ * network I/O thread.
+ */
+public class TaskEventDispatcher {
+
+   TableExecutionAttemptID, IntermediateResultPartitionID, BufferWriter 
registeredWriters = HashBasedTable.create();
+
+   public void registerWriterForIncomingTaskEvents(ExecutionAttemptID 
executionId, IntermediateResultPartitionID partitionId, BufferWriter listener) {
+   synchronized (registeredWriters) {
+   if (registeredWriters.put(executionId, partitionId, 
listener) != null) {
+   throw new IllegalStateException(Event 
dispatcher already contains buffer writer.);
+   }
+   }
+   }
+
+   public void unregisterWriters(ExecutionAttemptID executionId) {
+   synchronized (registeredWriters) {
+   ListIntermediateResultPartitionID writersToUnregister 
= new ArrayListIntermediateResultPartitionID();
+
+   for (IntermediateResultPartitionID partitionId : 
registeredWriters.row(executionId).keySet()) {
+   writersToUnregister.add(partitionId);
+   }
+
+   for(IntermediateResultPartitionID partitionId : 
writersToUnregister) {
+   registeredWriters.remove(executionId, 
partitionId);
+   }
+   }
+   }
+
+   /**
+* Publishes the event to the registered {@link EventListener} instance.
+* p
+* This method is either called from a local input channel or the 
network
+* I/O thread on behalf of a remote input channel.
+*/
+   public boolean publish(ExecutionAttemptID executionId, 
IntermediateResultPartitionID partitionId, TaskEvent event) {
+   EventListenerTaskEvent listener = 
registeredWriters.get(executionId, partitionId);
+
+   if (listener != null) {
+   listener.onEvent(event);
+   return true;
+   }
+
+   return false;
+   }
+
+   int getNumberOfRegisteredWriters() {
+   synchronized (registeredWriters) {
+   return registeredWriters.size();
+   }
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
deleted file mode 100644
index 3af9aef..000
--- 

[13/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results

2015-01-12 Thread uce
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
new file mode 100644
index 000..7255390
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/Reader.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * A record-oriented reader for immutable record types.
+ */
+public interface ReaderT extends IOReadableWritable extends ReaderBase {
+
+   boolean hasNext() throws IOException, InterruptedException;
+
+   T next() throws IOException, InterruptedException;
+
+   void clearBuffers();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
new file mode 100644
index 000..bb6ec44
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/ReaderBase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.util.event.EventListener;
+
+/**
+ * The basic API every reader (both buffer- and record-oriented) has to 
support.
+ */
+public interface ReaderBase {
+
+   // 

+   // Properties
+   // 

+
+   boolean isFinished();
+
+   // 

+   // Events
+   // 

+
+   void subscribeToTaskEvent(EventListenerTaskEvent eventListener, 
Class? extends TaskEvent eventType);
+
+   void sendTaskEvent(TaskEvent event) throws IOException, 
InterruptedException;
+
+   // 

+   // Iterations
+   // 

+
+   void setIterativeReader();
+
+   void startNextSuperstep();
+
+   boolean hasReachedEndOfSuperstep();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/RecordReader.java
new file mode 100644
index 

[06/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results

2015-01-12 Thread uce
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
index 95e6da5..c7cde92 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphConstructionTest.java
@@ -89,11 +89,11 @@ public class ExecutionGraphConstructionTest {
v4.setParallelism(11);
v5.setParallelism(4);

-   v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
-   v4.connectNewDataSetAsInput(v2, DistributionPattern.BIPARTITE);
-   v4.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
-   v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
-   v5.connectNewDataSetAsInput(v3, DistributionPattern.BIPARTITE);
+   v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
+   v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
+   v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
+   v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+   v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);

ListAbstractJobVertex ordered = new 
ArrayListAbstractJobVertex(Arrays.asList(v1, v2, v3, v4, v5));
 
@@ -125,7 +125,7 @@ public class ExecutionGraphConstructionTest {
v3.setParallelism(2);

// this creates an intermediate result for v1
-   v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+   v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);

// create results for v2 and v3
IntermediateDataSet v2result = v2.createAndAddResultDataSet();
@@ -151,10 +151,10 @@ public class ExecutionGraphConstructionTest {
v4.setParallelism(11);
v5.setParallelism(4);

-   v4.connectDataSetAsInput(v2result, 
DistributionPattern.BIPARTITE);
-   v4.connectDataSetAsInput(v3result_1, 
DistributionPattern.BIPARTITE);
-   v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
-   v5.connectDataSetAsInput(v3result_2, 
DistributionPattern.BIPARTITE);
+   v4.connectDataSetAsInput(v2result, 
DistributionPattern.ALL_TO_ALL);
+   v4.connectDataSetAsInput(v3result_1, 
DistributionPattern.ALL_TO_ALL);
+   v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+   v5.connectDataSetAsInput(v3result_2, 
DistributionPattern.ALL_TO_ALL);

ListAbstractJobVertex ordered2 = new 
ArrayListAbstractJobVertex(Arrays.asList(v4, v5));

@@ -186,7 +186,7 @@ public class ExecutionGraphConstructionTest {
v3.setParallelism(2);

// this creates an intermediate result for v1
-   v2.connectNewDataSetAsInput(v1, DistributionPattern.BIPARTITE);
+   v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);

// create results for v2 and v3
IntermediateDataSet v2result = v2.createAndAddResultDataSet();
@@ -212,10 +212,10 @@ public class ExecutionGraphConstructionTest {
v4.setParallelism(11);
v5.setParallelism(4);

-   v4.connectIdInput(v2result.getId(), 
DistributionPattern.BIPARTITE);
-   v4.connectIdInput(v3result_1.getId(), 
DistributionPattern.BIPARTITE);
-   v5.connectNewDataSetAsInput(v4, DistributionPattern.BIPARTITE);
-   v5.connectIdInput(v3result_2.getId(), 
DistributionPattern.BIPARTITE);
+   v4.connectIdInput(v2result.getId(), 
DistributionPattern.ALL_TO_ALL);
+   v4.connectIdInput(v3result_1.getId(), 
DistributionPattern.ALL_TO_ALL);
+   v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
+   v5.connectIdInput(v3result_2.getId(), 
DistributionPattern.ALL_TO_ALL);

ListAbstractJobVertex ordered2 = new 
ArrayListAbstractJobVertex(Arrays.asList(v4, v5));

@@ -298,7 +298,7 @@ public class ExecutionGraphConstructionTest {
int sumOfPartitions = 0;
for (ExecutionEdge inEdge : inputs) {

[11/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results

2015-01-12 Thread uce
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
deleted file mode 100644
index 3bca0a4..000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
+++ /dev/null
@@ -1,384 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * License); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an AS IS BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.gates;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.Buffer;
-import 
org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import org.apache.flink.runtime.io.network.channels.InputChannel;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobID;
-
-/**
- * Input gates are a specialization of general gates and connect input 
channels and record readers. As
- * channels, input gates are always parameterized to a specific type of record 
which they can transport. In contrast to
- * output gates input gates can be associated with a {@link 
DistributionPattern} object which dictates the concrete
- * wiring between two groups of vertices.
- * 
- * @param T The type of record that can be transported through this gate.
- */
-public class InputGateT extends IOReadableWritable extends GateT 
implements BufferProvider, LocalBufferPoolOwner {
-   
-   /**
-* The log object used for debugging.
-*/
-   private static final Logger LOG = 
LoggerFactory.getLogger(InputGate.class);
-
-   /**
-* The array of input channels attached to this input gate.
-*/
-   private InputChannelT[] channels;
-
-   /**
-* Queue with indices of channels that store at least one available 
record.
-*/
-   private final BlockingQueueInteger availableChannels = new 
LinkedBlockingQueueInteger();
-
-   /**
-* The listener object to be notified when a channel has at least one 
record available.
-*/
-   private final AtomicReferenceRecordAvailabilityListenerT 
recordAvailabilityListener = new 
AtomicReferenceRecordAvailabilityListenerT(null);
-   
-   
-   private AbstractTaskEvent currentEvent;
-
-   /**
-* If the value of this variable is set to codetrue/code, the input 
gate is closed.
-*/
-   private boolean isClosed = false;
-
-   /**
-* The channel to read from next.
-*/
-   private int channelToReadFrom = -1;
-
-   private LocalBufferPool bufferPool;
-
-   /**
-* Constructs a new runtime input gate.
-* 
-* @param jobID
-*the ID of the job this input gate belongs to
-* @param gateID
-*the ID of the gate
-* @param index
-*the index assigned to this input gate at the {@link 
Environment} object
-*/
-   public InputGate(final JobID jobID, final GateID gateID, final int 
index) {
-   super(jobID, gateID, index);
-   }
-
-   

[15/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results

2015-01-12 Thread uce
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 1f79067..7de8651 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A version of the {@link IOManager} that uses asynchronous I/O.
@@ -181,13 +181,13 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID 
channelID,

LinkedBlockingQueueMemorySegment returnQueue) throws IOException
{
-   Preconditions.checkState(!shutdown, I/O-Manger is closed.);
+   checkState(!shutdown, I/O-Manger is closed.);
return new AsynchronousBlockWriter(channelID, 
this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
}

@Override
-   public BlockChannelWriterWithCallback 
createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback 
callback) throws IOException {
-   Preconditions.checkState(!shutdown, I/O-Manger is closed.);
+   public BlockChannelWriterWithCallback 
createBlockChannelWriter(FileIOChannel.ID channelID, 
RequestDoneCallbackMemorySegment callback) throws IOException {
+   checkState(!shutdown, I/O-Manger is closed.);
return new AsynchronousBlockWriterWithCallback(channelID, 
this.writers[channelID.getThreadNum()].requestQueue, callback);
}

@@ -205,7 +205,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
public BlockChannelReader createBlockChannelReader(FileIOChannel.ID 
channelID,

LinkedBlockingQueueMemorySegment returnQueue) throws IOException
{
-   Preconditions.checkState(!shutdown, I/O-Manger is closed.);
+   checkState(!shutdown, I/O-Manger is closed.);
return new AsynchronousBlockReader(channelID, 
this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
}

@@ -228,7 +228,7 @@ public class IOManagerAsync extends IOManager implements 
UncaughtExceptionHandle
public BulkBlockChannelReader 
createBulkBlockChannelReader(FileIOChannel.ID channelID,
ListMemorySegment targetSegments, int numBlocks) 
throws IOException
{
-   Preconditions.checkState(!shutdown, I/O-Manger is closed.);
+   checkState(!shutdown, I/O-Manger is closed.);
return new AsynchronousBulkBlockReader(channelID, 
this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
index 1b7adae..69791f4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
@@ -33,7 +33,6 @@ interface IORequest {
public void requestDone(IOException ioex);
 }
 
-
 /**
  * Interface for I/O requests that are handled by the IOManager's reading 
thread. 
  */
@@ -47,7 +46,6 @@ interface ReadRequest extends IORequest {
public void read() throws IOException;
 }
 
-
 /**
  * Interface for I/O requests that are handled by the IOManager's writing 
thread.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java

[18/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results

2015-01-12 Thread uce
[FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for 
intermediate results

This closes #254.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d908ca19
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d908ca19
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d908ca19

Branch: refs/heads/master
Commit: d908ca19741bf2561cb6a7663541f642e60c0e6d
Parents: c8c50be
Author: Ufuk Celebi u...@apache.org
Authored: Wed Oct 8 11:40:31 2014 +0200
Committer: Ufuk Celebi u...@apache.org
Committed: Mon Jan 12 09:15:12 2015 +0100

--
 .../flink/streaming/api/JobGraphBuilder.java|   2 +-
 .../api/collector/DirectedStreamCollector.java  |   2 +-
 .../api/collector/StreamCollector.java  |   2 +-
 .../api/streamvertex/CoStreamVertex.java|  83 +-
 .../api/streamvertex/InputHandler.java  |  16 +-
 .../api/streamvertex/OutputHandler.java |  22 +-
 .../api/streamvertex/StreamIterationHead.java   |   4 +-
 .../api/streamvertex/StreamVertex.java  |   3 +-
 .../streamvertex/StreamingRuntimeContext.java   |   2 +-
 .../flink/streaming/io/CoRecordReader.java  | 223 ++---
 .../flink/streaming/io/StreamRecordWriter.java  | 117 +--
 .../partitioner/StreamPartitioner.java  |   2 +-
 .../api/collector/DirectedOutputTest.java   |  22 +-
 .../streaming/api/streamrecord/UIDTest.java |   4 +-
 .../api/streamvertex/MockRecordWriter.java  |   4 +-
 .../plantranslate/NepheleJobGraphGenerator.java |   4 +-
 .../compiler/BranchingPlansCompilerTest.java|  12 +-
 .../apache/flink/core/memory/MemorySegment.java |   3 +-
 .../org/apache/flink/types/BooleanValue.java|   3 +-
 .../examples/java/wordcount/WordCount.java  |   4 +-
 .../java/org/apache/flink/api/java/DataSet.java |  12 +-
 flink-runtime/pom.xml   |   2 +-
 .../broadcast/BroadcastVariableManager.java |   2 +-
 .../BroadcastVariableMaterialization.java   |   6 +-
 .../deployment/ChannelDeploymentDescriptor.java |  86 --
 .../deployment/GateDeploymentDescriptor.java|  80 --
 .../PartitionConsumerDeploymentDescriptor.java  |  94 ++
 .../PartitionDeploymentDescriptor.java  | 116 +++
 .../flink/runtime/deployment/PartitionInfo.java | 159 
 .../deployment/TaskDeploymentDescriptor.java| 159 ++--
 .../runtime/event/task/AbstractTaskEvent.java   |  29 -
 .../flink/runtime/event/task/EventListener.java |  36 -
 .../event/task/EventNotificationManager.java| 108 ---
 .../runtime/event/task/IntegerTaskEvent.java|   2 +-
 .../flink/runtime/event/task/RuntimeEvent.java  |  22 +
 .../runtime/event/task/StringTaskEvent.java |   2 +-
 .../flink/runtime/event/task/TaskEvent.java |  22 +
 .../flink/runtime/execution/Environment.java| 147 +---
 .../runtime/execution/RuntimeEnvironment.java   | 732 
 .../flink/runtime/executiongraph/Execution.java | 254 --
 .../executiongraph/ExecutionAttemptID.java  |  16 +-
 .../runtime/executiongraph/ExecutionEdge.java   |  34 +-
 .../runtime/executiongraph/ExecutionGraph.java  | 311 ++-
 .../runtime/executiongraph/ExecutionVertex.java | 181 ++--
 .../executiongraph/IntermediateResult.java  |  52 +-
 .../IntermediateResultPartition.java|  55 +-
 .../disk/iomanager/AsynchronousBlockReader.java |  17 +-
 .../AsynchronousBlockWriterWithCallback.java|  17 +-
 .../iomanager/AsynchronousBulkBlockReader.java  |  17 +-
 .../iomanager/AsynchronousFileIOChannel.java|  55 +-
 .../runtime/io/disk/iomanager/IOManager.java|  73 +-
 .../io/disk/iomanager/IOManagerAsync.java   |  18 +-
 .../runtime/io/disk/iomanager/IORequest.java|   2 -
 .../io/disk/iomanager/QueuingCallback.java  |   2 +-
 .../io/disk/iomanager/RequestDoneCallback.java  |  16 +-
 .../apache/flink/runtime/io/network/Buffer.java |  98 ---
 .../runtime/io/network/BufferRecycler.java  |  32 -
 .../runtime/io/network/ChannelManager.java  | 692 ---
 .../network/ConnectionInfoLookupResponse.java   | 111 ---
 .../runtime/io/network/ConnectionManager.java   |  42 +
 .../flink/runtime/io/network/Envelope.java  | 183 
 .../runtime/io/network/EnvelopeDispatcher.java  |  52 --
 .../io/network/EnvelopeReceiverList.java|  81 --
 .../network/InsufficientResourcesException.java |  43 -
 .../io/network/LocalConnectionManager.java  |  19 +-
 .../LocalReceiverCancelledException.java|  41 -
 .../io/network/NetworkConnectionManager.java|  35 -
 .../runtime/io/network/NetworkEnvironment.java  | 277 ++
 .../flink/runtime/io/network/RemoteAddress.java | 122 +++
 .../runtime/io/network/RemoteReceiver.java  | 150 
 .../runtime/io/network/SenderHintEvent.java | 122 ---
 .../runtime/io/network/TaskEventDispatcher.java |  88 ++
 .../io/network/api/AbstractRecordReader.java| 104 ---
 

flink git commit: Undo erroneous commit 86f87537

2015-01-12 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master 9f07373f3 - aba76171f


Undo erroneous commit 86f87537


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aba76171
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aba76171
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aba76171

Branch: refs/heads/master
Commit: aba76171fef41e2c987913c32fefafc55ef635f6
Parents: 9f07373
Author: Robert Metzger rmetz...@apache.org
Authored: Mon Jan 12 13:26:57 2015 +0100
Committer: Robert Metzger rmetz...@apache.org
Committed: Mon Jan 12 13:26:57 2015 +0100

--
 flink-quickstart/quickstart-scala.sh | 2 +-
 flink-quickstart/quickstart.sh   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/aba76171/flink-quickstart/quickstart-scala.sh
--
diff --git a/flink-quickstart/quickstart-scala.sh 
b/flink-quickstart/quickstart-scala.sh
index 019f6b0..0690f3c 100755
--- a/flink-quickstart/quickstart-scala.sh
+++ b/flink-quickstart/quickstart-scala.sh
@@ -24,7 +24,7 @@ PACKAGE=quickstart
 mvn archetype:generate 
\
   -DarchetypeGroupId=org.apache.flink  \
   -DarchetypeArtifactId=flink-quickstart-scala \
-  -DarchetypeVersion=0.8.0 \
+  -DarchetypeVersion=0.7.0-incubating  \
   -DgroupId=org.apache.flink   \
   -DartifactId=$PACKAGE
\
   -Dversion=0.1
\

http://git-wip-us.apache.org/repos/asf/flink/blob/aba76171/flink-quickstart/quickstart.sh
--
diff --git a/flink-quickstart/quickstart.sh b/flink-quickstart/quickstart.sh
index 92309d4..568ccc6 100755
--- a/flink-quickstart/quickstart.sh
+++ b/flink-quickstart/quickstart.sh
@@ -24,7 +24,7 @@ PACKAGE=quickstart
 mvn archetype:generate 
\
   -DarchetypeGroupId=org.apache.flink  \
   -DarchetypeArtifactId=flink-quickstart-java  \
-  -DarchetypeVersion=0.8.0 \
+  -DarchetypeVersion=0.7.0-incubating  \
   -DgroupId=org.apache.flink   \
   -DartifactId=$PACKAGE
\
   -Dversion=0.1
\



[5/5] flink git commit: [dist] Updated mailing list info in binary distribution

2015-01-12 Thread mbalassi
[dist] Updated mailing list info in binary distribution


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b4e9db09
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b4e9db09
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b4e9db09

Branch: refs/heads/release-0.8
Commit: b4e9db091ee95348dc85a27fee87d27edc4dde45
Parents: 4f95ce7
Author: mbalassi mbala...@apache.org
Authored: Mon Jan 12 10:50:55 2015 +0100
Committer: mbalassi mbala...@apache.org
Committed: Mon Jan 12 10:50:55 2015 +0100

--
 flink-dist/src/main/flink-bin/README.txt | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b4e9db09/flink-dist/src/main/flink-bin/README.txt
--
diff --git a/flink-dist/src/main/flink-bin/README.txt 
b/flink-dist/src/main/flink-bin/README.txt
index 46fab4d..4ebee19 100644
--- a/flink-dist/src/main/flink-bin/README.txt
+++ b/flink-dist/src/main/flink-bin/README.txt
@@ -8,8 +8,8 @@ and our GitHub Account
 
 If you have any questions, ask on our Mailing lists:
 
-   u...@flink.incubator.apache.org
-   d...@flink.incubator.apache.org
+   u...@flink.apache.org
+   d...@flink.apache.org
 
 This distribution includes cryptographic software.  The country in 
 which you currently reside may have restrictions on the import, 



[1/5] flink git commit: [docs] Prepare documentation for 0.8 release

2015-01-12 Thread mbalassi
Repository: flink
Updated Branches:
  refs/heads/release-0.8 ada35eb05 - b4e9db091


[docs] Prepare documentation for 0.8 release


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7cd0f477
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7cd0f477
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7cd0f477

Branch: refs/heads/release-0.8
Commit: 7cd0f47787d37add0ae6b87b7dd0dd454dfb04cb
Parents: ada35eb
Author: Robert Metzger metzg...@web.de
Authored: Sat Jan 10 12:21:56 2015 +0100
Committer: mbalassi mbala...@apache.org
Committed: Mon Jan 12 09:57:32 2015 +0100

--
 docs/README.md   |  2 +-
 docs/_config.yml | 10 +-
 docs/cli.md  | 12 ++--
 docs/cluster_execution.md|  4 ++--
 docs/example_connectors.md   |  8 
 docs/hadoop_compatibility.md |  2 +-
 docs/index.md|  4 ++--
 docs/java_api_quickstart.md  |  2 +-
 docs/local_execution.md  |  2 +-
 docs/programming_guide.md| 12 ++--
 docs/scala_api_quickstart.md |  2 +-
 docs/setup_quickstart.md |  8 
 docs/spargel_guide.md|  2 +-
 docs/streaming_guide.md  |  2 +-
 docs/yarn_setup.md   | 14 +++---
 15 files changed, 43 insertions(+), 43 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7cd0f477/docs/README.md
--
diff --git a/docs/README.md b/docs/README.md
index 8a19034..6dc76f2 100644
--- a/docs/README.md
+++ b/docs/README.md
@@ -44,7 +44,7 @@ title of the page. This title is used as the top-level 
heading for the page.
 
 Furthermore, you can access variables found in `docs/_config.yml` as follows:
 
-{{ site.FLINK_VERSION_STABLE }}
+{{ site.FLINK_VERSION_SHORT }}
 
 This will be replaced with the value of the variable when generating the docs.
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7cd0f477/docs/_config.yml
--
diff --git a/docs/_config.yml b/docs/_config.yml
index f9e3690..0fccf8c 100644
--- a/docs/_config.yml
+++ b/docs/_config.yml
@@ -22,9 +22,9 @@
 # {{ site.CONFIG_KEY }}
 #--
 
-FLINK_VERSION_STABLE: 0.7.0-incubating # this variable can point to a 
SNAPSHOT version in the git source.
-FLINK_VERSION_SHORT: 0.7.0
-FLINK_VERSION_HADOOP_2_STABLE: 0.7-hadoop2-incubating
+FLINK_VERSION_HADOOP1_STABLE: 0.8.0-hadoop1 # this variable can point to a 
SNAPSHOT version in the git source.
+FLINK_VERSION_SHORT: 0.8.0
+FLINK_VERSION_HADOOP2_STABLE: 0.8.0
 FLINK_SCALA_VERSION: 2.10.4
 FLINK_SCALA_VERSION_SHORT: 2.10
 FLINK_ISSUES_URL: https://issues.apache.org/jira/browse/FLINK
@@ -33,8 +33,8 @@ FLINK_GITHUB_URL:  https://github.com/apache/incubator-flink
 FLINK_WEBSITE_URL: http://flink.incubator.apache.org
 FLINK_DOWNLOAD_URL: http://flink.incubator.apache.org/downloads.html
 
-FLINK_DOWNLOAD_URL_HADOOP_1_STABLE: 
http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop1.tgz
-FLINK_DOWNLOAD_URL_HADOOP_2_STABLE: 
http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2.tgz
+FLINK_DOWNLOAD_URL_HADOOP1_STABLE: 
http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop1.tgz
+FLINK_DOWNLOAD_URL_HADOOP2_STABLE: 
http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2.tgz
 FLINK_DOWNLOAD_URL_YARN_STABLE: 
http://www.apache.org/dyn/closer.cgi/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2-yarn.tgz
 FLINK_WGET_URL_YARN_STABLE: 
http://artfiles.org/apache.org/incubator/flink/flink-0.7.0-incubating/flink-0.7.0-incubating-bin-hadoop2-yarn.tgz
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7cd0f477/docs/cli.md
--
diff --git a/docs/cli.md b/docs/cli.md
index 7db31a6..ceab6e4 100644
--- a/docs/cli.md
+++ b/docs/cli.md
@@ -46,33 +46,33 @@ The command line can be used to
 
 -   Run example program with no arguments.
 
-./bin/flink run ./examples/flink-java-examples-{{ 
site.FLINK_VERSION_STABLE }}-WordCount.jar
+./bin/flink run ./examples/flink-java-examples-{{ 
site.FLINK_VERSION_SHORT }}-WordCount.jar
 
 -   Run example program with arguments for input and result files
 
-./bin/flink run ./examples/flink-java-examples-{{ 
site.FLINK_VERSION_STABLE }}-WordCount.jar \
+./bin/flink run ./examples/flink-java-examples-{{ 
site.FLINK_VERSION_SHORT }}-WordCount.jar \
file:///home/user/hamlet.txt 

[3/5] flink git commit: [FLINK-1385] Print warning if not resources are _currently_ available in the YARN cluster. Instead of rejecting the session

2015-01-12 Thread mbalassi
[FLINK-1385] Print warning if not resources are _currently_ available in the 
YARN cluster.
Instead of rejecting the session

This closes #294


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e83ccd03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e83ccd03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e83ccd03

Branch: refs/heads/release-0.8
Commit: e83ccd03cde884626588d729f6dc9e4d624c9faf
Parents: 7cd0f47
Author: Robert Metzger metzg...@web.de
Authored: Sat Jan 10 18:20:10 2015 +0100
Committer: mbalassi mbala...@apache.org
Committed: Mon Jan 12 09:57:33 2015 +0100

--
 .../main/java/org/apache/flink/yarn/Client.java | 21 +---
 1 file changed, 9 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e83ccd03/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
--
diff --git 
a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java 
b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
index 161aa8a..2100df9 100644
--- a/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
+++ b/flink-addons/flink-yarn/src/main/java/org/apache/flink/yarn/Client.java
@@ -394,25 +394,22 @@ public class Client {
yarnClient.stop();
System.exit(1);
}
+
+   final String NOTE = \nThe Flink YARN client will try to 
allocate the YARN session, but maybe not all TaskManagers are  +
+   connecting from the beginning. The allocation 
might take more time than usual!;
int totalMemoryRequired = jmMemory + tmMemory * 
taskManagerCount;
ClusterResourceDescription freeClusterMem = 
getCurrentFreeClusterResources(yarnClient);
if(freeClusterMem.totalFreeMemory  totalMemoryRequired) {
-   LOG.error(This YARN session requires 
+totalMemoryRequired+MB of memory in the cluster. 
-   + There are currently only 
+freeClusterMem.totalFreeMemory+MB available.);
-   yarnClient.stop();
-   System.exit(1);
+   LOG.warn(This YARN session requires  + 
totalMemoryRequired + MB of memory in the cluster. 
+   + There are currently only  + 
freeClusterMem.totalFreeMemory + MB available. + NOTE);
}
if( tmMemory  freeClusterMem.containerLimit) {
-   LOG.error(The requested amount of memory for the 
TaskManagers (+tmMemory+MB) is more than 
-   + the largest possible YARN container: 
+freeClusterMem.containerLimit);
-   yarnClient.stop();
-   System.exit(1);
+   LOG.warn(The requested amount of memory for the 
TaskManagers ( + tmMemory + MB) is more than 
+   + currently the largest possible YARN 
container:  + freeClusterMem.containerLimit + NOTE);
}
if( jmMemory  freeClusterMem.containerLimit) {
-   LOG.error(The requested amount of memory for the 
JobManager (+jmMemory+MB) is more than 
-   + the largest possible YARN container: 
+freeClusterMem.containerLimit);
-   yarnClient.stop();
-   System.exit(1);
+   LOG.warn(The requested amount of memory for the 
JobManager ( + jmMemory + MB) is more than 
+   + currently the largest possible YARN 
container:  + freeClusterMem.containerLimit + NOTE);
}
 
// respect custom JVM options in the YAML file