http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
deleted file mode 100644
index 1d51216..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ /dev/null
@@ -1,406 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.SerializationException;
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.StreamVertexException;
-import org.apache.flink.streaming.partitioner.ShufflePartitioner;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.state.OperatorState;
-import org.apache.flink.util.InstantiationUtil;
-
-public class StreamConfig implements Serializable {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final String INPUT_TYPE = "inputType_";
-       private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs";
-       private static final String NUMBER_OF_INPUTS = "numberOfInputs";
-       private static final String CHAINED_OUTPUTS = "chainedOutputs";
-       private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_";
-       private static final String IS_CHAINED_VERTEX = "isChainedSubtask";
-       private static final String OUTPUT_NAME = "outputName_";
-       private static final String PARTITIONER_OBJECT = "partitionerObject_";
-       private static final String VERTEX_NAME = "vertexName";
-       private static final String ITERATION_ID = "iteration-id";
-       private static final String OUTPUT_SELECTOR = "outputSelector";
-       private static final String DIRECTED_EMIT = "directedEmit";
-       private static final String SERIALIZEDUDF = "serializedudf";
-       private static final String USER_FUNCTION = "userfunction";
-       private static final String BUFFER_TIMEOUT = "bufferTimeout";
-       private static final String OPERATOR_STATES = "operatorStates";
-       private static final String TYPE_SERIALIZER_IN_1 = 
"typeSerializer_in_1";
-       private static final String TYPE_SERIALIZER_IN_2 = 
"typeSerializer_in_2";
-       private static final String TYPE_SERIALIZER_OUT_1 = 
"typeSerializer_out_1";
-       private static final String TYPE_SERIALIZER_OUT_2 = 
"typeSerializer_out_2";
-       private static final String ITERATON_WAIT = "iterationWait";
-       private static final String OUTPUTS = "outVertexNames";
-       private static final String EDGES_IN_ORDER = "rwOrder";
-
-       // DEFAULT VALUES
-
-       private static final long DEFAULT_TIMEOUT = 100;
-
-       // CONFIG METHODS
-
-       private Configuration config;
-
-       public StreamConfig(Configuration config) {
-               this.config = config;
-       }
-
-       public Configuration getConfiguration() {
-               return config;
-       }
-
-       public void setVertexName(String vertexName) {
-               config.setString(VERTEX_NAME, vertexName);
-       }
-
-       public String getTaskName() {
-               return config.getString(VERTEX_NAME, "Missing");
-       }
-
-       public void setTypeSerializerIn1(StreamRecordSerializer<?> serializer) {
-               setTypeSerializer(TYPE_SERIALIZER_IN_1, serializer);
-       }
-
-       public void setTypeSerializerIn2(StreamRecordSerializer<?> serializer) {
-               setTypeSerializer(TYPE_SERIALIZER_IN_2, serializer);
-       }
-
-       public void setTypeSerializerOut1(StreamRecordSerializer<?> serializer) 
{
-               setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
-       }
-
-       public void setTypeSerializerOut2(StreamRecordSerializer<?> serializer) 
{
-               setTypeSerializer(TYPE_SERIALIZER_OUT_2, serializer);
-       }
-
-       @SuppressWarnings("unchecked")
-       public <T> StreamRecordSerializer<T> getTypeSerializerIn1(ClassLoader 
cl) {
-               try {
-                       return (StreamRecordSerializer<T>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       TYPE_SERIALIZER_IN_1, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
serializer.");
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public <T> StreamRecordSerializer<T> getTypeSerializerIn2(ClassLoader 
cl) {
-               try {
-                       return (StreamRecordSerializer<T>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       TYPE_SERIALIZER_IN_2, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
serializer.");
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public <T> StreamRecordSerializer<T> getTypeSerializerOut1(ClassLoader 
cl) {
-               try {
-                       return (StreamRecordSerializer<T>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       TYPE_SERIALIZER_OUT_1, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
serializer.");
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public <T> StreamRecordSerializer<T> getTypeSerializerOut2(ClassLoader 
cl) {
-               try {
-                       return (StreamRecordSerializer<T>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       TYPE_SERIALIZER_OUT_2, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
serializer.");
-               }
-       }
-
-       private void setTypeSerializer(String key, StreamRecordSerializer<?> 
typeWrapper) {
-               config.setBytes(key, SerializationUtils.serialize(typeWrapper));
-       }
-
-       public void setBufferTimeout(long timeout) {
-               config.setLong(BUFFER_TIMEOUT, timeout);
-       }
-
-       public long getBufferTimeout() {
-               return config.getLong(BUFFER_TIMEOUT, DEFAULT_TIMEOUT);
-       }
-
-       public void setUserInvokable(StreamInvokable<?, ?> invokableObject) {
-               if (invokableObject != null) {
-                       config.setClass(USER_FUNCTION, 
invokableObject.getClass());
-
-                       try {
-                               config.setBytes(SERIALIZEDUDF, 
SerializationUtils.serialize(invokableObject));
-                       } catch (SerializationException e) {
-                               throw new RuntimeException("Cannot serialize 
invokable object "
-                                               + invokableObject.getClass(), 
e);
-                       }
-               }
-       }
-
-       @SuppressWarnings({ "unchecked" })
-       public <T> T getUserInvokable(ClassLoader cl) {
-               try {
-                       return (T) 
InstantiationUtil.readObjectFromConfig(this.config, SERIALIZEDUDF, cl);
-               } catch (Exception e) {
-                       throw new StreamVertexException("Cannot instantiate 
user function", e);
-               }
-       }
-
-       public void setDirectedEmit(boolean directedEmit) {
-               config.setBoolean(DIRECTED_EMIT, directedEmit);
-       }
-
-       public boolean isDirectedEmit() {
-               return config.getBoolean(DIRECTED_EMIT, false);
-       }
-
-
-       public void setOutputSelectors(List<OutputSelector<?>> outputSelector) {
-               try {
-                       if (outputSelector != null) {
-                               setDirectedEmit(true);
-                               config.setBytes(OUTPUT_SELECTOR, 
SerializationUtils.serialize((Serializable) outputSelector));
-                       }
-               } catch (SerializationException e) {
-                       throw new RuntimeException("Cannot serialize 
OutputSelector");
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl) {
-               try {
-                       return (List<OutputSelector<T>>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       OUTPUT_SELECTOR, cl);
-               } catch (Exception e) {
-                       throw new StreamVertexException("Cannot deserialize and 
instantiate OutputSelector", e);
-               }
-       }
-
-       public void setIterationId(Integer iterationId) {
-               config.setInteger(ITERATION_ID, iterationId);
-       }
-
-       public Integer getIterationId() {
-               return config.getInteger(ITERATION_ID, 0);
-       }
-
-       public void setIterationWaitTime(long time) {
-               config.setLong(ITERATON_WAIT, time);
-       }
-
-       public long getIterationWaitTime() {
-               return config.getLong(ITERATON_WAIT, 0);
-       }
-
-       public <T> void setPartitioner(String output, StreamPartitioner<T> 
partitionerObject) {
-
-               config.setBytes(PARTITIONER_OBJECT + output,
-                               
SerializationUtils.serialize(partitionerObject));
-       }
-
-       @SuppressWarnings("unchecked")
-       public <T> StreamPartitioner<T> getPartitioner(ClassLoader cl, String 
output) {
-               StreamPartitioner<T> partitioner = null;
-               try {
-                       partitioner = (StreamPartitioner<T>) 
InstantiationUtil.readObjectFromConfig(
-                                       this.config, PARTITIONER_OBJECT + 
output, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Partitioner could not be 
instantiated.");
-               }
-               if (partitioner != null) {
-                       return partitioner;
-               } else {
-                       return new ShufflePartitioner<T>();
-               }
-       }
-
-       public void setSelectedNames(String output, List<String> selected) {
-               if (selected != null) {
-                       config.setBytes(OUTPUT_NAME + output,
-                                       
SerializationUtils.serialize((Serializable) selected));
-               } else {
-                       config.setBytes(OUTPUT_NAME + output,
-                                       
SerializationUtils.serialize((Serializable) new ArrayList<String>()));
-               }
-       }
-
-       @SuppressWarnings("unchecked")
-       public List<String> getSelectedNames(String output) {
-               return (List<String>) 
SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME + output,
-                               null));
-       }
-
-       public void setNumberOfInputs(int numberOfInputs) {
-               config.setInteger(NUMBER_OF_INPUTS, numberOfInputs);
-       }
-
-       public int getNumberOfInputs() {
-               return config.getInteger(NUMBER_OF_INPUTS, 0);
-       }
-
-       public void setNumberOfOutputs(int numberOfOutputs) {
-               config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs);
-       }
-
-       public int getNumberOfOutputs() {
-               return config.getInteger(NUMBER_OF_OUTPUTS, 0);
-       }
-
-       public void setOutputs(List<String> outputVertexNames) {
-               config.setBytes(OUTPUTS, 
SerializationUtils.serialize((Serializable) outputVertexNames));
-       }
-
-       @SuppressWarnings("unchecked")
-       public List<String> getOutputs(ClassLoader cl) {
-               try {
-                       return (List<String>) 
InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
outputs.");
-               }
-       }
-
-       public void setOutEdgesInOrder(List<Tuple2<String, String>> 
outEdgeList) {
-
-               config.setBytes(EDGES_IN_ORDER, 
SerializationUtils.serialize((Serializable) outEdgeList));
-       }
-
-       @SuppressWarnings("unchecked")
-       public List<Tuple2<String, String>> getOutEdgesInOrder(ClassLoader cl) {
-               try {
-                       return (List<Tuple2<String, String>>) 
InstantiationUtil.readObjectFromConfig(
-                                       this.config, EDGES_IN_ORDER, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
outputs.");
-               }
-       }
-
-       public void setInputIndex(int inputNumber, Integer inputTypeNumber) {
-               config.setInteger(INPUT_TYPE + inputNumber++, inputTypeNumber);
-       }
-
-       public int getInputIndex(int inputNumber) {
-               return config.getInteger(INPUT_TYPE + inputNumber, 0);
-       }
-
-       public void setOperatorStates(Map<String, OperatorState<?>> states) {
-               config.setBytes(OPERATOR_STATES, 
SerializationUtils.serialize((Serializable) states));
-       }
-
-       @SuppressWarnings("unchecked")
-       public Map<String, OperatorState<?>> getOperatorStates(ClassLoader cl) {
-               try {
-                       return (Map<String, OperatorState<?>>) 
InstantiationUtil.readObjectFromConfig(
-                                       this.config, OPERATOR_STATES, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not load operator 
state");
-               }
-       }
-
-       public void setChainedOutputs(List<String> chainedOutputs) {
-               config.setBytes(CHAINED_OUTPUTS,
-                               SerializationUtils.serialize((Serializable) 
chainedOutputs));
-       }
-
-       @SuppressWarnings("unchecked")
-       public List<String> getChainedOutputs(ClassLoader cl) {
-               try {
-                       return (List<String>) 
InstantiationUtil.readObjectFromConfig(this.config,
-                                       CHAINED_OUTPUTS, cl);
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
chained outputs.");
-               }
-       }
-
-       public void setTransitiveChainedTaskConfigs(Map<String, StreamConfig> 
chainedTaskConfigs) {
-               config.setBytes(CHAINED_TASK_CONFIG,
-                               SerializationUtils.serialize((Serializable) 
chainedTaskConfigs));
-       }
-
-       @SuppressWarnings("unchecked")
-       public Map<String, StreamConfig> 
getTransitiveChainedTaskConfigs(ClassLoader cl) {
-               try {
-
-                       Map<String, StreamConfig> confs = (Map<String, 
StreamConfig>) InstantiationUtil
-                                       .readObjectFromConfig(this.config, 
CHAINED_TASK_CONFIG, cl);
-
-                       return confs == null ? new HashMap<String, 
StreamConfig>() : confs;
-               } catch (Exception e) {
-                       throw new RuntimeException("Could not instantiate 
configuration.");
-               }
-       }
-
-       public void setChainStart() {
-               config.setBoolean(IS_CHAINED_VERTEX, true);
-       }
-
-       public boolean isChainStart() {
-               return config.getBoolean(IS_CHAINED_VERTEX, false);
-       }
-
-       @Override
-       public String toString() {
-
-               ClassLoader cl = getClass().getClassLoader();
-
-               StringBuilder builder = new StringBuilder();
-               builder.append("\n=======================");
-               builder.append("Stream Config");
-               builder.append("=======================");
-               builder.append("\nTask name: " + getTaskName());
-               builder.append("\nNumber of non-chained inputs: " + 
getNumberOfInputs());
-               builder.append("\nNumber of non-chained outputs: " + 
getNumberOfOutputs());
-               builder.append("\nOutput names: " + getOutputs(cl));
-               builder.append("\nPartitioning:");
-               for (String outputname : getOutputs(cl)) {
-                       builder.append("\n\t" + outputname + ": "
-                                       + getPartitioner(cl, 
outputname).getClass().getSimpleName());
-               }
-
-               builder.append("\nChained subtasks: " + getChainedOutputs(cl));
-
-               try {
-                       builder.append("\nInvokable: " + 
getUserInvokable(cl).getClass().getSimpleName());
-               } catch (Exception e) {
-                       builder.append("\nInvokable: Missing");
-               }
-               builder.append("\nBuffer timeout: " + getBufferTimeout());
-               if (isChainStart() && getChainedOutputs(cl).size() > 0) {
-                       builder.append("\n\n\n---------------------\nChained 
task configs\n---------------------\n");
-                       
builder.append(getTransitiveChainedTaskConfigs(cl)).toString();
-               }
-
-               return builder.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
deleted file mode 100644
index b5e43af..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ /dev/null
@@ -1,620 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.compiler.plan.StreamingPlan;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.streaming.api.collector.OutputSelector;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.api.streamvertex.CoStreamVertex;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
-import org.apache.flink.streaming.api.streamvertex.StreamVertex;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.state.OperatorState;
-import org.apache.sling.commons.json.JSONArray;
-import org.apache.sling.commons.json.JSONException;
-import org.apache.sling.commons.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Object for building Apache Flink stream processing graphs
- */
-public class StreamGraph extends StreamingPlan {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamGraph.class);
-       private final static String DEAFULT_JOB_NAME = "Flink Streaming Job";
-
-       protected boolean chaining = true;
-       private String jobName = DEAFULT_JOB_NAME;
-
-       // Graph attributes
-       private Map<String, Integer> operatorParallelisms;
-       private Map<String, Long> bufferTimeouts;
-       private Map<String, List<String>> outEdgeLists;
-       private Map<String, List<Integer>> outEdgeTypes;
-       private Map<String, List<List<String>>> selectedNames;
-       private Map<String, List<String>> inEdgeLists;
-       private Map<String, List<StreamPartitioner<?>>> outputPartitioners;
-       private Map<String, String> operatorNames;
-       private Map<String, StreamInvokable<?, ?>> invokableObjects;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersIn1;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
-       private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
-       private Map<String, Class<? extends AbstractInvokable>> 
jobVertexClasses;
-       private Map<String, List<OutputSelector<?>>> outputSelectors;
-       private Map<String, Integer> iterationIds;
-       private Map<Integer, String> iterationIDtoHeadName;
-       private Map<Integer, String> iterationIDtoTailName;
-       private Map<String, Integer> iterationTailCount;
-       private Map<String, Long> iterationTimeouts;
-       private Map<String, Map<String, OperatorState<?>>> operatorStates;
-       private Map<String, InputFormat<String, ?>> inputFormatLists;
-
-       private Set<String> sources;
-
-       public StreamGraph() {
-
-               initGraph();
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("StreamGraph created");
-               }
-       }
-
-       public void initGraph() {
-               operatorParallelisms = new HashMap<String, Integer>();
-               bufferTimeouts = new HashMap<String, Long>();
-               outEdgeLists = new HashMap<String, List<String>>();
-               outEdgeTypes = new HashMap<String, List<Integer>>();
-               selectedNames = new HashMap<String, List<List<String>>>();
-               inEdgeLists = new HashMap<String, List<String>>();
-               outputPartitioners = new HashMap<String, 
List<StreamPartitioner<?>>>();
-               operatorNames = new HashMap<String, String>();
-               invokableObjects = new HashMap<String, StreamInvokable<?, ?>>();
-               typeSerializersIn1 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               typeSerializersIn2 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               typeSerializersOut1 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               typeSerializersOut2 = new HashMap<String, 
StreamRecordSerializer<?>>();
-               outputSelectors = new HashMap<String, 
List<OutputSelector<?>>>();
-               jobVertexClasses = new HashMap<String, Class<? extends 
AbstractInvokable>>();
-               iterationIds = new HashMap<String, Integer>();
-               iterationIDtoHeadName = new HashMap<Integer, String>();
-               iterationIDtoTailName = new HashMap<Integer, String>();
-               iterationTailCount = new HashMap<String, Integer>();
-               iterationTimeouts = new HashMap<String, Long>();
-               operatorStates = new HashMap<String, Map<String, 
OperatorState<?>>>();
-               inputFormatLists = new HashMap<String, InputFormat<String, 
?>>();
-               sources = new HashSet<String>();
-       }
-
-       /**
-        * Adds a vertex to the streaming graph with the given parameters
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param invokableObject
-        *            User defined operator
-        * @param inTypeInfo
-        *            Input type for serialization
-        * @param outTypeInfo
-        *            Output type for serialization
-        * @param operatorName
-        *            Operator type
-        * @param parallelism
-        *            Number of parallel instances created
-        */
-       public <IN, OUT> void addStreamVertex(String vertexName,
-                       StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
-                       TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
-
-               addVertex(vertexName, StreamVertex.class, invokableObject, 
operatorName, parallelism);
-
-               StreamRecordSerializer<IN> inSerializer = inTypeInfo != null ? 
new StreamRecordSerializer<IN>(
-                               inTypeInfo) : null;
-               StreamRecordSerializer<OUT> outSerializer = outTypeInfo != null 
? new StreamRecordSerializer<OUT>(
-                               outTypeInfo) : null;
-
-               addTypeSerializers(vertexName, inSerializer, null, 
outSerializer, null);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Vertex: {}", vertexName);
-               }
-       }
-
-       public <IN, OUT> void addSourceVertex(String vertexName,
-                       StreamInvokable<IN, OUT> invokableObject, 
TypeInformation<IN> inTypeInfo,
-                       TypeInformation<OUT> outTypeInfo, String operatorName, 
int parallelism) {
-               addStreamVertex(vertexName, invokableObject, inTypeInfo, 
outTypeInfo, operatorName,
-                               parallelism);
-               sources.add(vertexName);
-       }
-
-       /**
-        * Adds a vertex for the iteration head to the {@link JobGraph}. The
-        * iterated values will be fed from this vertex back to the graph.
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param iterationHead
-        *            Id of the iteration head
-        * @param iterationID
-        *            ID of iteration for multiple iterations
-        * @param parallelism
-        *            Number of parallel instances created
-        * @param waitTime
-        *            Max wait time for next record
-        */
-       public void addIterationHead(String vertexName, String iterationHead, 
Integer iterationID,
-                       int parallelism, long waitTime) {
-
-               addVertex(vertexName, StreamIterationHead.class, null, null, 
parallelism);
-
-               chaining = false;
-
-               iterationIds.put(vertexName, iterationID);
-               iterationIDtoHeadName.put(iterationID, vertexName);
-
-               setSerializersFrom(iterationHead, vertexName);
-
-               setEdge(vertexName, iterationHead,
-                               
outputPartitioners.get(inEdgeLists.get(iterationHead).get(0)).get(0), 0,
-                               new ArrayList<String>());
-
-               iterationTimeouts.put(iterationIDtoHeadName.get(iterationID), 
waitTime);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("ITERATION SOURCE: {}", vertexName);
-               }
-
-               sources.add(vertexName);
-       }
-
-       /**
-        * Adds a vertex for the iteration tail to the {@link JobGraph}. The 
values
-        * intended to be iterated will be sent to this sink from the iteration
-        * head.
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param iterationTail
-        *            Id of the iteration tail
-        * @param iterationID
-        *            ID of iteration for mulitple iterations
-        * @param parallelism
-        *            Number of parallel instances created
-        * @param waitTime
-        *            Max waiting time for next record
-        */
-       public void addIterationTail(String vertexName, String iterationTail, 
Integer iterationID,
-                       long waitTime) {
-
-               if (bufferTimeouts.get(iterationTail) == 0) {
-                       throw new RuntimeException("Buffer timeout 0 at 
iteration tail is not supported.");
-               }
-
-               addVertex(vertexName, StreamIterationTail.class, null, null, 
getParallelism(iterationTail));
-
-               iterationIds.put(vertexName, iterationID);
-               iterationIDtoTailName.put(iterationID, vertexName);
-
-               setSerializersFrom(iterationTail, vertexName);
-               iterationTimeouts.put(iterationIDtoTailName.get(iterationID), 
waitTime);
-
-               setParallelism(iterationIDtoHeadName.get(iterationID), 
getParallelism(iterationTail));
-               setBufferTimeout(iterationIDtoHeadName.get(iterationID), 
bufferTimeouts.get(iterationTail));
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("ITERATION SINK: {}", vertexName);
-               }
-
-       }
-
-       public <IN1, IN2, OUT> void addCoTask(String vertexName,
-                       CoInvokable<IN1, IN2, OUT> taskInvokableObject, 
TypeInformation<IN1> in1TypeInfo,
-                       TypeInformation<IN2> in2TypeInfo, TypeInformation<OUT> 
outTypeInfo,
-                       String operatorName, int parallelism) {
-
-               addVertex(vertexName, CoStreamVertex.class, 
taskInvokableObject, operatorName, parallelism);
-
-               addTypeSerializers(vertexName, new 
StreamRecordSerializer<IN1>(in1TypeInfo),
-                               new StreamRecordSerializer<IN2>(in2TypeInfo), 
new StreamRecordSerializer<OUT>(
-                                               outTypeInfo), null);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("CO-TASK: {}", vertexName);
-               }
-       }
-
-       /**
-        * Sets vertex parameters in the JobGraph
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param vertexClass
-        *            The class of the vertex
-        * @param invokableObjectject
-        *            The user defined invokable object
-        * @param operatorName
-        *            Type of the user defined operator
-        * @param parallelism
-        *            Number of parallel instances created
-        */
-       private void addVertex(String vertexName, Class<? extends 
AbstractInvokable> vertexClass,
-                       StreamInvokable<?, ?> invokableObject, String 
operatorName, int parallelism) {
-
-               jobVertexClasses.put(vertexName, vertexClass);
-               setParallelism(vertexName, parallelism);
-               invokableObjects.put(vertexName, invokableObject);
-               operatorNames.put(vertexName, operatorName);
-               outEdgeLists.put(vertexName, new ArrayList<String>());
-               outEdgeTypes.put(vertexName, new ArrayList<Integer>());
-               selectedNames.put(vertexName, new ArrayList<List<String>>());
-               outputSelectors.put(vertexName, new 
ArrayList<OutputSelector<?>>());
-               inEdgeLists.put(vertexName, new ArrayList<String>());
-               outputPartitioners.put(vertexName, new 
ArrayList<StreamPartitioner<?>>());
-               iterationTailCount.put(vertexName, 0);
-       }
-
-       /**
-        * Connects two vertices in the JobGraph using the selected partitioner
-        * settings
-        * 
-        * @param upStreamVertexName
-        *            Name of the upstream(output) vertex
-        * @param downStreamVertexName
-        *            Name of the downstream(input) vertex
-        * @param partitionerObject
-        *            Partitioner object
-        * @param typeNumber
-        *            Number of the type (used at co-functions)
-        * @param outputNames
-        *            User defined names of the out edge
-        */
-       public void setEdge(String upStreamVertexName, String 
downStreamVertexName,
-                       StreamPartitioner<?> partitionerObject, int typeNumber, 
List<String> outputNames) {
-               outEdgeLists.get(upStreamVertexName).add(downStreamVertexName);
-               outEdgeTypes.get(upStreamVertexName).add(typeNumber);
-               inEdgeLists.get(downStreamVertexName).add(upStreamVertexName);
-               
outputPartitioners.get(upStreamVertexName).add(partitionerObject);
-               selectedNames.get(upStreamVertexName).add(outputNames);
-       }
-
-       private void addTypeSerializers(String vertexName, 
StreamRecordSerializer<?> in1,
-                       StreamRecordSerializer<?> in2, 
StreamRecordSerializer<?> out1,
-                       StreamRecordSerializer<?> out2) {
-               typeSerializersIn1.put(vertexName, in1);
-               typeSerializersIn2.put(vertexName, in2);
-               typeSerializersOut1.put(vertexName, out1);
-               typeSerializersOut2.put(vertexName, out2);
-       }
-
-       /**
-        * Sets the number of parallel instances created for the given vertex.
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param parallelism
-        *            Number of parallel instances created
-        */
-       public void setParallelism(String vertexName, int parallelism) {
-               operatorParallelisms.put(vertexName, parallelism);
-       }
-
-       public int getParallelism(String vertexName) {
-               return operatorParallelisms.get(vertexName);
-       }
-
-       /**
-        * Sets the input format for the given vertex.
-        * 
-        * @param vertexName
-        *            Name of the vertex
-        * @param inputFormat
-        *            input format of the file source associated with the given
-        *            vertex
-        */
-       public void setInputFormat(String vertexName, InputFormat<String, ?> 
inputFormat) {
-               inputFormatLists.put(vertexName, inputFormat);
-       }
-
-       public void setBufferTimeout(String vertexName, long bufferTimeout) {
-               this.bufferTimeouts.put(vertexName, bufferTimeout);
-       }
-
-       public long getBufferTimeout(String vertexName) {
-               return this.bufferTimeouts.get(vertexName);
-       }
-
-       public void addOperatorState(String veretxName, String stateName, 
OperatorState<?> state) {
-               Map<String, OperatorState<?>> states = 
operatorStates.get(veretxName);
-               if (states == null) {
-                       states = new HashMap<String, OperatorState<?>>();
-                       states.put(stateName, state);
-               } else {
-                       if (states.containsKey(stateName)) {
-                               throw new RuntimeException("State has already 
been registered with this name: "
-                                               + stateName);
-                       } else {
-                               states.put(stateName, state);
-                       }
-               }
-               operatorStates.put(veretxName, states);
-       }
-
-       /**
-        * Sets a user defined {@link OutputSelector} for the given operator. 
Used
-        * for directed emits.
-        * 
-        * @param vertexName
-        *            Name of the vertex for which the output selector will be 
set
-        * @param outputSelector
-        *            The user defined output selector.
-        */
-       public <T> void setOutputSelector(String vertexName, OutputSelector<T> 
outputSelector) {
-               outputSelectors.get(vertexName).add(outputSelector);
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Outputselector set for {}", vertexName);
-               }
-
-       }
-
-       public <IN, OUT> void setInvokable(String vertexName, 
StreamInvokable<IN, OUT> invokableObject) {
-               invokableObjects.put(vertexName, invokableObject);
-       }
-
-       public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
-               StreamRecordSerializer<OUT> serializer = new 
StreamRecordSerializer<OUT>(outType);
-               typeSerializersOut1.put(id, serializer);
-       }
-
-       public StreamInvokable<?, ?> getInvokable(String vertexName) {
-               return invokableObjects.get(vertexName);
-       }
-
-       @SuppressWarnings("unchecked")
-       public <OUT> StreamRecordSerializer<OUT> getOutSerializer1(String 
vertexName) {
-               return (StreamRecordSerializer<OUT>) 
typeSerializersOut1.get(vertexName);
-       }
-
-       @SuppressWarnings("unchecked")
-       public <OUT> StreamRecordSerializer<OUT> getOutSerializer2(String 
vertexName) {
-               return (StreamRecordSerializer<OUT>) 
typeSerializersOut2.get(vertexName);
-       }
-
-       @SuppressWarnings("unchecked")
-       public <IN> StreamRecordSerializer<IN> getInSerializer1(String 
vertexName) {
-               return (StreamRecordSerializer<IN>) 
typeSerializersIn1.get(vertexName);
-       }
-
-       @SuppressWarnings("unchecked")
-       public <IN> StreamRecordSerializer<IN> getInSerializer2(String 
vertexName) {
-               return (StreamRecordSerializer<IN>) 
typeSerializersIn2.get(vertexName);
-       }
-
-       /**
-        * Sets TypeSerializerWrapper from one vertex to another, used with some
-        * sinks.
-        * 
-        * @param from
-        *            from
-        * @param to
-        *            to
-        */
-       public void setSerializersFrom(String from, String to) {
-               operatorNames.put(to, operatorNames.get(from));
-
-               typeSerializersIn1.put(to, typeSerializersOut1.get(from));
-               typeSerializersIn2.put(to, typeSerializersOut2.get(from));
-               typeSerializersOut1.put(to, typeSerializersOut1.get(from));
-               typeSerializersOut2.put(to, typeSerializersOut2.get(from));
-       }
-
-       /**
-        * Gets the assembled {@link JobGraph} and adds a default name for it.
-        */
-       public JobGraph getJobGraph() {
-               return getJobGraph(jobName);
-       }
-
-       /**
-        * Gets the assembled {@link JobGraph} and adds a user specified name 
for
-        * it.
-        * 
-        * @param jobGraphName
-        *            name of the jobGraph
-        */
-       public JobGraph getJobGraph(String jobGraphName) {
-
-               this.jobName = jobGraphName;
-               StreamingJobGraphGenerator optimizer = new 
StreamingJobGraphGenerator(this);
-
-               return optimizer.createJobGraph(jobGraphName);
-       }
-
-       public void setJobName(String jobName) {
-               this.jobName = jobName;
-       }
-
-       public void setChaining(boolean chaining) {
-               this.chaining = chaining;
-       }
-
-       public Collection<String> getSources() {
-               return sources;
-       }
-
-       public List<String> getOutEdges(String vertexName) {
-               return outEdgeLists.get(vertexName);
-       }
-
-       public List<String> getInEdges(String vertexName) {
-               return inEdgeLists.get(vertexName);
-       }
-
-       public List<Integer> getOutEdgeTypes(String vertexName) {
-
-               return outEdgeTypes.get(vertexName);
-       }
-
-       public StreamPartitioner<?> getOutPartitioner(String upStreamVertex, 
String downStreamVertex) {
-               return outputPartitioners.get(upStreamVertex).get(
-                               
outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
-       }
-
-       public List<String> getSelectedNames(String upStreamVertex, String 
downStreamVertex) {
-
-               return selectedNames.get(upStreamVertex).get(
-                               
outEdgeLists.get(upStreamVertex).indexOf(downStreamVertex));
-       }
-
-       public Collection<Integer> getIterationIDs() {
-               return new HashSet<Integer>(iterationIds.values());
-       }
-
-       public String getIterationTail(int iterID) {
-               return iterationIDtoTailName.get(iterID);
-       }
-
-       public String getIterationHead(int iterID) {
-               return iterationIDtoHeadName.get(iterID);
-       }
-
-       public Class<? extends AbstractInvokable> getJobVertexClass(String 
vertexName) {
-               return jobVertexClasses.get(vertexName);
-       }
-
-       public InputFormat<String, ?> getInputFormat(String vertexName) {
-               return inputFormatLists.get(vertexName);
-       }
-
-       public List<OutputSelector<?>> getOutputSelector(String vertexName) {
-               return outputSelectors.get(vertexName);
-       }
-
-       public Map<String, OperatorState<?>> getState(String vertexName) {
-               return operatorStates.get(vertexName);
-       }
-
-       public Integer getIterationID(String vertexName) {
-               return iterationIds.get(vertexName);
-       }
-
-       public long getIterationTimeout(String vertexName) {
-               return iterationTimeouts.get(vertexName);
-       }
-
-       public String getOperatorName(String vertexName) {
-               return operatorNames.get(vertexName);
-       }
-
-       @Override
-       public String getStreamingPlanAsJSON() {
-
-               try {
-                       JSONObject json = new JSONObject();
-                       JSONArray nodes = new JSONArray();
-
-                       json.put("nodes", nodes);
-
-                       for (String id : operatorNames.keySet()) {
-                               JSONObject node = new JSONObject();
-                               nodes.put(node);
-
-                               node.put("id", Integer.valueOf(id));
-                               node.put("type", getOperatorName(id));
-
-                               if (sources.contains(id)) {
-                                       node.put("pact", "Data Source");
-                               } else {
-                                       node.put("pact", "Data Stream");
-                               }
-
-                               node.put("contents", getOperatorName(id) + " at 
"
-                                               + 
getInvokable(id).getUserFunction().getClass().getSimpleName());
-                               node.put("parallelism", getParallelism(id));
-
-                               int numIn = getInEdges(id).size();
-                               if (numIn > 0) {
-
-                                       JSONArray inputs = new JSONArray();
-                                       node.put("predecessors", inputs);
-
-                                       for (int i = 0; i < numIn; i++) {
-
-                                               String inID = 
getInEdges(id).get(i);
-
-                                               JSONObject input = new 
JSONObject();
-                                               inputs.put(input);
-
-                                               input.put("id", 
Integer.valueOf(inID));
-                                               input.put("ship_strategy", 
getOutPartitioner(inID, id).getStrategy());
-                                               if (i == 0) {
-                                                       input.put("side", 
"first");
-                                               } else if (i == 1) {
-                                                       input.put("side", 
"second");
-                                               }
-                                       }
-                               }
-
-                       }
-                       return json.toString();
-               } catch (JSONException e) {
-                       if (LOG.isDebugEnabled()) {
-                               LOG.debug("JSON plan creation failed: {}", e);
-                       }
-                       return "";
-               }
-
-       }
-
-       @Override
-       public void dumpStreamingPlanAsJSON(File file) throws IOException {
-               PrintWriter pw = null;
-               try {
-                       pw = new PrintWriter(new FileOutputStream(file), false);
-                       pw.write(getStreamingPlanAsJSON());
-                       pw.flush();
-
-               } finally {
-                       if (pw != null) {
-                               pw.close();
-                       }
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
deleted file mode 100644
index 24ebf46..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.ScheduleMode;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import 
org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
-import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
-import org.apache.flink.streaming.partitioner.StreamPartitioner;
-import 
org.apache.flink.streaming.partitioner.StreamPartitioner.PartitioningStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamingJobGraphGenerator {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
-
-       private StreamGraph streamGraph;
-
-       private Map<String, AbstractJobVertex> streamVertices;
-       private JobGraph jobGraph;
-       private Collection<String> builtNodes;
-
-       private Map<String, Map<String, StreamConfig>> chainedConfigs;
-       private Map<String, StreamConfig> vertexConfigs;
-       private Map<String, String> chainedNames;
-
-       public StreamingJobGraphGenerator(StreamGraph streamGraph) {
-               this.streamGraph = streamGraph;
-       }
-
-       private void init() {
-               this.streamVertices = new HashMap<String, AbstractJobVertex>();
-               this.builtNodes = new HashSet<String>();
-               this.chainedConfigs = new HashMap<String, Map<String, 
StreamConfig>>();
-               this.vertexConfigs = new HashMap<String, StreamConfig>();
-               this.chainedNames = new HashMap<String, String>();
-       }
-
-       public JobGraph createJobGraph(String jobName) {
-               jobGraph = new JobGraph(jobName);
-               // Turn lazy scheduling off
-               jobGraph.setScheduleMode(ScheduleMode.ALL);
-
-               init();
-
-               for (String sourceName : streamGraph.getSources()) {
-                       createChain(sourceName, sourceName);
-               }
-
-               setSlotSharing();
-
-               return jobGraph;
-       }
-
-       private List<Tuple2<String, String>> createChain(String startNode, 
String current) {
-
-               if (!builtNodes.contains(startNode)) {
-
-                       List<Tuple2<String, String>> transitiveOutEdges = new 
ArrayList<Tuple2<String, String>>();
-                       List<String> chainableOutputs = new ArrayList<String>();
-                       List<String> nonChainableOutputs = new 
ArrayList<String>();
-
-                       for (String outName : streamGraph.getOutEdges(current)) 
{
-                               if (isChainable(current, outName)) {
-                                       chainableOutputs.add(outName);
-                               } else {
-                                       nonChainableOutputs.add(outName);
-                               }
-                       }
-
-                       for (String chainable : chainableOutputs) {
-                               
transitiveOutEdges.addAll(createChain(startNode, chainable));
-                       }
-
-                       for (String nonChainable : nonChainableOutputs) {
-                               transitiveOutEdges.add(new Tuple2<String, 
String>(current, nonChainable));
-                               createChain(nonChainable, nonChainable);
-                       }
-
-                       chainedNames.put(current, createChainedName(current, 
chainableOutputs));
-
-                       StreamConfig config = current.equals(startNode) ? 
createProcessingVertex(startNode)
-                                       : new StreamConfig(new Configuration());
-
-                       setVertexConfig(current, config, chainableOutputs, 
nonChainableOutputs);
-
-                       if (current.equals(startNode)) {
-
-                               config.setChainStart();
-                               config.setOutEdgesInOrder(transitiveOutEdges);
-
-                               for (Tuple2<String, String> edge : 
transitiveOutEdges) {
-                                       connect(startNode, edge);
-                               }
-
-                               
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNode));
-
-                       } else {
-
-                               Map<String, StreamConfig> chainedConfs = 
chainedConfigs.get(startNode);
-
-                               if (chainedConfs == null) {
-                                       chainedConfigs.put(startNode, new 
HashMap<String, StreamConfig>());
-                               }
-                               chainedConfigs.get(startNode).put(current, 
config);
-                       }
-
-                       return transitiveOutEdges;
-
-               } else {
-                       return new ArrayList<Tuple2<String, String>>();
-               }
-       }
-
-       private String createChainedName(String vertexID, List<String> 
chainedOutputs) {
-               String vertexName = streamGraph.getOperatorName(vertexID);
-               if (chainedOutputs.size() > 1) {
-                       List<String> outputChainedNames = new 
ArrayList<String>();
-                       for (String chainable : chainedOutputs) {
-                               
outputChainedNames.add(chainedNames.get(chainable));
-                       }
-                       return vertexName + " -> (" + 
StringUtils.join(outputChainedNames, ", ") + ")";
-               } else if (chainedOutputs.size() == 1) {
-                       return vertexName + " -> " + 
chainedNames.get(chainedOutputs.get(0));
-               } else {
-                       return vertexName;
-               }
-
-       }
-
-       private StreamConfig createProcessingVertex(String vertexName) {
-
-               AbstractJobVertex vertex = new 
AbstractJobVertex(chainedNames.get(vertexName));
-
-               
vertex.setInvokableClass(streamGraph.getJobVertexClass(vertexName));
-               if (streamGraph.getParallelism(vertexName) > 0) {
-                       
vertex.setParallelism(streamGraph.getParallelism(vertexName));
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("Parallelism set: {} for {}", 
streamGraph.getParallelism(vertexName),
-                                       vertexName);
-               }
-
-               if (streamGraph.getInputFormat(vertexName) != null) {
-                       
vertex.setInputSplitSource(streamGraph.getInputFormat(vertexName));
-               }
-
-               streamVertices.put(vertexName, vertex);
-               builtNodes.add(vertexName);
-               jobGraph.addVertex(vertex);
-
-               return new StreamConfig(vertex.getConfiguration());
-       }
-
-       private void setVertexConfig(String vertexName, StreamConfig config,
-                       List<String> chainableOutputs, List<String> 
nonChainableOutputs) {
-
-               config.setVertexName(vertexName);
-               
config.setBufferTimeout(streamGraph.getBufferTimeout(vertexName));
-
-               
config.setTypeSerializerIn1(streamGraph.getInSerializer1(vertexName));
-               
config.setTypeSerializerIn2(streamGraph.getInSerializer2(vertexName));
-               
config.setTypeSerializerOut1(streamGraph.getOutSerializer1(vertexName));
-               
config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName));
-
-               config.setUserInvokable(streamGraph.getInvokable(vertexName));
-               
config.setOutputSelectors(streamGraph.getOutputSelector(vertexName));
-               config.setOperatorStates(streamGraph.getState(vertexName));
-
-               config.setNumberOfOutputs(nonChainableOutputs.size());
-               config.setOutputs(nonChainableOutputs);
-               config.setChainedOutputs(chainableOutputs);
-
-               Class<? extends AbstractInvokable> vertexClass = 
streamGraph.getJobVertexClass(vertexName);
-
-               if (vertexClass.equals(StreamIterationHead.class)
-                               || 
vertexClass.equals(StreamIterationTail.class)) {
-                       
config.setIterationId(streamGraph.getIterationID(vertexName));
-                       
config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexName));
-               }
-
-               List<String> allOutputs = new 
ArrayList<String>(chainableOutputs);
-               allOutputs.addAll(nonChainableOutputs);
-
-               for (String output : allOutputs) {
-                       config.setSelectedNames(output, 
streamGraph.getSelectedNames(vertexName, output));
-               }
-
-               vertexConfigs.put(vertexName, config);
-       }
-
-       private <T> void connect(String headOfChain, Tuple2<String, String> 
edge) {
-
-               String upStreamVertexName = edge.f0;
-               String downStreamVertexName = edge.f1;
-
-               int outputIndex = 
streamGraph.getOutEdges(upStreamVertexName).indexOf(downStreamVertexName);
-
-               AbstractJobVertex headVertex = streamVertices.get(headOfChain);
-               AbstractJobVertex downStreamVertex = 
streamVertices.get(downStreamVertexName);
-
-               StreamConfig downStreamConfig = new 
StreamConfig(downStreamVertex.getConfiguration());
-               StreamConfig upStreamConfig = new 
StreamConfig(headVertex.getConfiguration());
-
-               List<Integer> outEdgeIndexList = 
streamGraph.getOutEdgeTypes(upStreamVertexName);
-               int numOfInputs = downStreamConfig.getNumberOfInputs();
-
-               downStreamConfig.setInputIndex(numOfInputs++, 
outEdgeIndexList.get(outputIndex));
-               downStreamConfig.setNumberOfInputs(numOfInputs);
-
-               StreamPartitioner<?> partitioner = 
streamGraph.getOutPartitioner(upStreamVertexName,
-                               downStreamVertexName);
-
-               upStreamConfig.setPartitioner(downStreamVertexName, 
partitioner);
-
-               if (partitioner.getStrategy() == PartitioningStrategy.FORWARD) {
-                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.POINTWISE);
-               } else {
-                       downStreamVertex.connectNewDataSetAsInput(headVertex, 
DistributionPattern.ALL_TO_ALL);
-               }
-
-               if (LOG.isDebugEnabled()) {
-                       LOG.debug("CONNECTED: {} - {} -> {}", 
partitioner.getClass().getSimpleName(),
-                                       headOfChain, downStreamVertexName);
-               }
-       }
-
-       private boolean isChainable(String vertexName, String outName) {
-
-               StreamInvokable<?, ?> headInvokable = 
streamGraph.getInvokable(vertexName);
-               StreamInvokable<?, ?> outInvokable = 
streamGraph.getInvokable(outName);
-
-               return streamGraph.getInEdges(outName).size() == 1
-                               && outInvokable != null
-                               && outInvokable.getChainingStrategy() == 
ChainingStrategy.ALWAYS
-                               && (headInvokable.getChainingStrategy() == 
ChainingStrategy.HEAD || headInvokable
-                                               .getChainingStrategy() == 
ChainingStrategy.ALWAYS)
-                               && streamGraph.getOutPartitioner(vertexName, 
outName).getStrategy() == PartitioningStrategy.FORWARD
-                               && streamGraph.getParallelism(vertexName) == 
streamGraph.getParallelism(outName)
-                               && streamGraph.chaining;
-       }
-
-       private void setSlotSharing() {
-               SlotSharingGroup shareGroup = new SlotSharingGroup();
-
-               for (AbstractJobVertex vertex : streamVertices.values()) {
-                       vertex.setSlotSharingGroup(shareGroup);
-               }
-
-               for (Integer iterID : streamGraph.getIterationIDs()) {
-                       CoLocationGroup ccg = new CoLocationGroup();
-                       AbstractJobVertex tail = 
streamVertices.get(streamGraph.getIterationTail(iterID));
-                       AbstractJobVertex head = 
streamVertices.get(streamGraph.getIterationHead(iterID));
-
-                       ccg.addVertex(head);
-                       ccg.addVertex(tail);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
deleted file mode 100644
index 1281bf0..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/CollectorWrapper.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.util.Collector;
-
-public class CollectorWrapper<OUT> implements Collector<OUT> {
-
-       private List<Collector<OUT>> outputs;
-
-       public CollectorWrapper() {
-               this.outputs = new LinkedList<Collector<OUT>>();
-       }
-
-       @SuppressWarnings("unchecked")
-       public void addCollector(Collector<?> output) {
-               outputs.add((Collector<OUT>) output);
-       }
-
-       @Override
-       public void collect(OUT record) {
-               for(Collector<OUT> output: outputs){
-                       output.collect(record);
-               }
-       }
-
-       @Override
-       public void close() {
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
deleted file mode 100755
index 4681cd3..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedCollectorWrapper.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.flink.util.Collector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A StreamCollector that uses user defined output names and a user defined
- * output selector to make directed emits.
- * 
- * @param <OUT>
- *            Type of the Tuple collected.
- */
-public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(DirectedCollectorWrapper.class);
-
-       List<OutputSelector<OUT>> outputSelectors;
-
-       protected Map<String, List<Collector<OUT>>> outputMap;
-
-       private List<Collector<OUT>> selectAllOutputs;
-       private Set<Collector<OUT>> emitted;
-
-       /**
-        * Creates a new DirectedStreamCollector
-        * 
-        * @param outputSelector
-        *            User defined {@link OutputSelector}
-        */
-       public DirectedCollectorWrapper(List<OutputSelector<OUT>> 
outputSelectors) {
-               this.outputSelectors = outputSelectors;
-               this.emitted = new HashSet<Collector<OUT>>();
-               this.selectAllOutputs = new LinkedList<Collector<OUT>>();
-               this.outputMap = new HashMap<String, List<Collector<OUT>>>();
-
-       }
-
-       @Override
-       public void addCollector(Collector<?> output) {
-               addCollector(output, new ArrayList<String>());
-       }
-
-       @SuppressWarnings("unchecked")
-       public void addCollector(Collector<?> output, List<String> 
selectedNames) {
-
-               if (selectedNames.isEmpty()) {
-                       selectAllOutputs.add((Collector<OUT>) output);
-               } else {
-                       for (String selectedName : selectedNames) {
-
-                               if (!outputMap.containsKey(selectedName)) {
-                                       outputMap.put(selectedName, new 
LinkedList<Collector<OUT>>());
-                                       
outputMap.get(selectedName).add((Collector<OUT>) output);
-                               } else {
-                                       if 
(!outputMap.get(selectedName).contains(output)) {
-                                               
outputMap.get(selectedName).add((Collector<OUT>) output);
-                                       }
-                               }
-
-                       }
-               }
-       }
-
-       @Override
-       public void collect(OUT record) {
-               emitted.clear();
-
-               for (Collector<OUT> output : selectAllOutputs) {
-                       output.collect(record);
-                       emitted.add(output);
-               }
-
-               for (OutputSelector<OUT> outputSelector : outputSelectors) {
-                       Iterable<String> outputNames = 
outputSelector.select(record);
-
-                       for (String outputName : outputNames) {
-                               List<Collector<OUT>> outputList = 
outputMap.get(outputName);
-                               if (outputList == null) {
-                                       if (LOG.isErrorEnabled()) {
-                                               String format = String.format(
-                                                               "Cannot emit 
because no output is selected with the name: %s",
-                                                               outputName);
-                                               LOG.error(format);
-
-                                       }
-                               } else {
-                                       for (Collector<OUT> output : 
outputList) {
-                                               if (!emitted.contains(output)) {
-                                                       output.collect(record);
-                                                       emitted.add(output);
-                                               }
-                                       }
-
-                               }
-
-                       }
-               }
-
-       }
-
-       @Override
-       public void close() {
-
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
deleted file mode 100644
index 6dbcff4..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.io.Serializable;
-
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.datastream.SplitDataStream;
-
-/**
- * Interface for defining an OutputSelector for a {@link SplitDataStream} using
- * the {@link SingleOutputStreamOperator#split} call. Every output object of a
- * {@link SplitDataStream} will run through this operator to select outputs.
- * 
- * @param <OUT>
- *            Type parameter of the split values.
- */
-public interface OutputSelector<OUT> extends Serializable {
-       /**
-        * Method for selecting output names for the emitted objects when using 
the
-        * {@link SingleOutputStreamOperator#split} method. The values will be
-        * emitted only to output names which are contained in the returned
-        * iterable.
-        * 
-        * @param value
-        *            Output object for which the output selection should be 
made.
-        */
-       public Iterable<String> select(OUT value);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
deleted file mode 100644
index 4551c5a..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamOutput.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.io.StreamRecordWriter;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class StreamOutput<OUT> implements Collector<OUT> {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(StreamOutput.class);
-
-       private RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output;
-       private SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
-       private StreamRecord<OUT> streamRecord;
-       private int channelID;
-
-       public 
StreamOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output,
-                       int channelID, SerializationDelegate<StreamRecord<OUT>> 
serializationDelegate) {
-
-               this.serializationDelegate = serializationDelegate;
-
-               if (serializationDelegate != null) {
-                       this.streamRecord = serializationDelegate.getInstance();
-               } else {
-                       throw new RuntimeException("Serializer cannot be null");
-               }
-               this.channelID = channelID;
-               this.output = output;
-       }
-
-       public RecordWriter<SerializationDelegate<StreamRecord<OUT>>> 
getRecordWriter() {
-               return output;
-       }
-
-       @Override
-       public void collect(OUT record) {
-               streamRecord.setObject(record);
-               streamRecord.newId(channelID);
-               serializationDelegate.setInstance(streamRecord);
-
-               try {
-                       output.emit(serializationDelegate);
-               } catch (Exception e) {
-                       if (LOG.isErrorEnabled()) {
-                               LOG.error("Emit failed due to: {}", 
StringUtils.stringifyException(e));
-                       }
-               }
-       }
-
-       @Override
-       public void close() {
-               if (output instanceof StreamRecordWriter) {
-                       
((StreamRecordWriter<SerializationDelegate<StreamRecord<OUT>>>) output).close();
-               } else {
-                       try {
-                               output.flush();
-                       } catch (IOException e) {
-                               e.printStackTrace();
-                       }
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
 
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
deleted file mode 100755
index a6eade8..0000000
--- 
a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ /dev/null
@@ -1,419 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.datastream;
-
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.StreamGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.function.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.function.co.RichCoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import 
org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-
-/**
- * The ConnectedDataStream represents a stream for two different data types. It
- * can be used to apply transformations like {@link CoMapFunction} on two
- * {@link DataStream}s
- * 
- * @param <IN1>
- *            Type of the first input data steam.
- * @param <IN2>
- *            Type of the second input data stream.
- */
-public class ConnectedDataStream<IN1, IN2> {
-
-       protected StreamExecutionEnvironment environment;
-       protected StreamGraph jobGraphBuilder;
-       protected DataStream<IN1> dataStream1;
-       protected DataStream<IN2> dataStream2;
-
-       protected boolean isGrouped;
-       protected KeySelector<IN1, ?> keySelector1;
-       protected KeySelector<IN2, ?> keySelector2;
-
-       protected ConnectedDataStream(DataStream<IN1> input1, DataStream<IN2> 
input2) {
-               this.jobGraphBuilder = input1.streamGraph;
-               this.environment = input1.environment;
-               this.dataStream1 = input1.copy();
-               this.dataStream2 = input2.copy();
-
-               if ((input1 instanceof GroupedDataStream) && (input2 instanceof 
GroupedDataStream)) {
-                       this.isGrouped = true;
-                       this.keySelector1 = ((GroupedDataStream<IN1>) 
input1).keySelector;
-                       this.keySelector2 = ((GroupedDataStream<IN2>) 
input2).keySelector;
-               } else {
-                       this.isGrouped = false;
-                       this.keySelector1 = null;
-                       this.keySelector2 = null;
-               }
-       }
-
-       protected ConnectedDataStream(ConnectedDataStream<IN1, IN2> 
coDataStream) {
-               this.jobGraphBuilder = coDataStream.jobGraphBuilder;
-               this.environment = coDataStream.environment;
-               this.dataStream1 = coDataStream.getFirst();
-               this.dataStream2 = coDataStream.getSecond();
-               this.isGrouped = coDataStream.isGrouped;
-               this.keySelector1 = coDataStream.keySelector1;
-               this.keySelector2 = coDataStream.keySelector2;
-       }
-
-       public <F> F clean(F f) {
-               if 
(getExecutionEnvironment().getConfig().isClosureCleanerEnabled()) {
-                       ClosureCleaner.clean(f, true);
-               }
-               ClosureCleaner.ensureSerializable(f);
-               return f;
-       }
-
-       public StreamExecutionEnvironment getExecutionEnvironment() {
-               return environment;
-       }
-
-       /**
-        * Returns the first {@link DataStream}.
-        * 
-        * @return The first DataStream.
-        */
-       public DataStream<IN1> getFirst() {
-               return dataStream1.copy();
-       }
-
-       /**
-        * Returns the second {@link DataStream}.
-        * 
-        * @return The second DataStream.
-        */
-       public DataStream<IN2> getSecond() {
-               return dataStream2.copy();
-       }
-
-       /**
-        * Gets the type of the first input
-        * 
-        * @return The type of the first input
-        */
-       public TypeInformation<IN1> getInputType1() {
-               return dataStream1.getType();
-       }
-
-       /**
-        * Gets the type of the second input
-        * 
-        * @return The type of the second input
-        */
-       public TypeInformation<IN2> getInputType2() {
-               return dataStream2.getType();
-       }
-
-       /**
-        * GroupBy operation for connected data stream. Groups the elements of
-        * input1 and input2 according to keyPosition1 and keyPosition2. Used 
for
-        * applying function on grouped data streams for example
-        * {@link ConnectedDataStream#reduce}
-        * 
-        * @param keyPosition1
-        *            The field used to compute the hashcode of the elements in 
the
-        *            first input stream.
-        * @param keyPosition2
-        *            The field used to compute the hashcode of the elements in 
the
-        *            second input stream.
-        * @return @return The transformed {@link ConnectedDataStream}
-        */
-       public ConnectedDataStream<IN1, IN2> groupBy(int keyPosition1, int 
keyPosition2) {
-               return new ConnectedDataStream<IN1, 
IN2>(dataStream1.groupBy(keyPosition1),
-                               dataStream2.groupBy(keyPosition2));
-       }
-
-       /**
-        * GroupBy operation for connected data stream. Groups the elements of
-        * input1 and input2 according to keyPositions1 and keyPositions2. Used 
for
-        * applying function on grouped data streams for example
-        * {@link ConnectedDataStream#reduce}
-        * 
-        * @param keyPositions1
-        *            The fields used to group the first input stream.
-        * @param keyPositions2
-        *            The fields used to group the second input stream.
-        * @return @return The transformed {@link ConnectedDataStream}
-        */
-       public ConnectedDataStream<IN1, IN2> groupBy(int[] keyPositions1, int[] 
keyPositions2) {
-               return new ConnectedDataStream<IN1, 
IN2>(dataStream1.groupBy(keyPositions1),
-                               dataStream2.groupBy(keyPositions2));
-       }
-
-       /**
-        * GroupBy operation for connected data stream using key expressions. 
Groups
-        * the elements of input1 and input2 according to field1 and field2. A 
field
-        * expression is either the name of a public field or a getter method 
with
-        * parentheses of the {@link DataStream}S underlying type. A dot can be 
used
-        * to drill down into objects, as in {@code "field1.getInnerField2()" }.
-        * 
-        * @param field1
-        *            The grouping expression for the first input
-        * @param field2
-        *            The grouping expression for the second input
-        * @return The grouped {@link ConnectedDataStream}
-        */
-       public ConnectedDataStream<IN1, IN2> groupBy(String field1, String 
field2) {
-               return new ConnectedDataStream<IN1, 
IN2>(dataStream1.groupBy(field1),
-                               dataStream2.groupBy(field2));
-       }
-
-       /**
-        * GroupBy operation for connected data stream using key expressions. 
Groups
-        * the elements of input1 and input2 according to fields1 and fields2. A
-        * field expression is either the name of a public field or a getter 
method
-        * with parentheses of the {@link DataStream}S underlying type. A dot 
can be
-        * used to drill down into objects, as in {@code 
"field1.getInnerField2()" }
-        * .
-        * 
-        * @param fields1
-        *            The grouping expressions for the first input
-        * @param fields2
-        *            The grouping expressions for the second input
-        * @return The grouped {@link ConnectedDataStream}
-        */
-       public ConnectedDataStream<IN1, IN2> groupBy(String[] fields1, String[] 
fields2) {
-               return new ConnectedDataStream<IN1, 
IN2>(dataStream1.groupBy(fields1),
-                               dataStream2.groupBy(fields2));
-       }
-
-       /**
-        * GroupBy operation for connected data stream. Groups the elements of
-        * input1 and input2 using keySelector1 and keySelector2. Used for 
applying
-        * function on grouped data streams for example
-        * {@link ConnectedDataStream#reduce}
-        * 
-        * @param keySelector1
-        *            The {@link KeySelector} used for grouping the first input
-        * @param keySelector2
-        *            The {@link KeySelector} used for grouping the second input
-        * @return @return The transformed {@link ConnectedDataStream}
-        */
-       public ConnectedDataStream<IN1, IN2> groupBy(KeySelector<IN1, ?> 
keySelector1,
-                       KeySelector<IN2, ?> keySelector2) {
-               return new ConnectedDataStream<IN1, 
IN2>(dataStream1.groupBy(keySelector1),
-                               dataStream2.groupBy(keySelector2));
-       }
-
-       /**
-        * Applies a CoMap transformation on a {@link ConnectedDataStream} and 
maps
-        * the output to a common type. The transformation calls a
-        * {@link CoMapFunction#map1} for each element of the first input and
-        * {@link CoMapFunction#map2} for each element of the second input. Each
-        * CoMapFunction call returns exactly one element. The user can also 
extend
-        * {@link RichCoMapFunction} to gain access to other features provided 
by
-        * the {@link RichFuntion} interface.
-        * 
-        * @param coMapper
-        *            The CoMapFunction used to jointly transform the two input
-        *            DataStreams
-        * @return The transformed {@link DataStream}
-        */
-       public <OUT> SingleOutputStreamOperator<OUT, ?> map(CoMapFunction<IN1, 
IN2, OUT> coMapper) {
-               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoMapFunction.class,
-                               coMapper.getClass(), 2, null, null);
-
-               return addCoFunction("Co-Map", outTypeInfo, new 
CoMapInvokable<IN1, IN2, OUT>(
-                               clean(coMapper)));
-
-       }
-
-       /**
-        * Applies a CoFlatMap transformation on a {@link ConnectedDataStream} 
and
-        * maps the output to a common type. The transformation calls a
-        * {@link CoFlatMapFunction#flatMap1} for each element of the first 
input
-        * and {@link CoFlatMapFunction#flatMap2} for each element of the second
-        * input. Each CoFlatMapFunction call returns any number of elements
-        * including none. The user can also extend {@link RichFlatMapFunction} 
to
-        * gain access to other features provided by the {@link RichFuntion}
-        * interface.
-        * 
-        * @param coFlatMapper
-        *            The CoFlatMapFunction used to jointly transform the two 
input
-        *            DataStreams
-        * @return The transformed {@link DataStream}
-        */
-       public <OUT> SingleOutputStreamOperator<OUT, ?> flatMap(
-                       CoFlatMapFunction<IN1, IN2, OUT> coFlatMapper) {
-               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoFlatMapFunction.class,
-                               coFlatMapper.getClass(), 2, null, null);
-
-               return addCoFunction("Co-Flat Map", outTypeInfo, new 
CoFlatMapInvokable<IN1, IN2, OUT>(
-                               clean(coFlatMapper)));
-       }
-
-       /**
-        * Applies a reduce transformation on a {@link ConnectedDataStream} and 
maps
-        * the outputs to a common type. If the {@link ConnectedDataStream} is
-        * batched or windowed then the reduce transformation is applied on 
every
-        * sliding batch/window of the data stream. If the connected data 
stream is
-        * grouped then the reducer is applied on every group of elements 
sharing
-        * the same key. This type of reduce is much faster than reduceGroup 
since
-        * the reduce function can be applied incrementally. The user can also
-        * extend the {@link RichCoReduceFunction} to gain access to other 
features
-        * provided by the {@link RichFuntion} interface.
-        * 
-        * @param coReducer
-        *            The {@link CoReduceFunction} that will be called for every
-        *            element of the inputs.
-        * @return The transformed {@link DataStream}.
-        */
-       public <OUT> SingleOutputStreamOperator<OUT, ?> 
reduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
-
-               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoReduceFunction.class,
-                               coReducer.getClass(), 2, null, null);
-
-               return addCoFunction("Co-Reduce", outTypeInfo, 
getReduceInvokable(clean(coReducer)));
-
-       }
-
-       /**
-        * Applies a CoWindow transformation on the connected DataStreams. The
-        * transformation calls the {@link CoWindowFunction#coWindow} method 
for for
-        * time aligned windows of the two data streams. System time is used as
-        * default to compute windows.
-        * 
-        * @param coWindowFunction
-        *            The {@link CoWindowFunction} that will be applied for the 
time
-        *            windows.
-        * @param windowSize
-        *            Size of the windows that will be aligned for both streams 
in
-        *            milliseconds.
-        * @param slideInterval
-        *            After every function call the windows will be slid by this
-        *            interval.
-        * 
-        * @return The transformed {@link DataStream}.
-        */
-       @SuppressWarnings("unchecked")
-       public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
-                       CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long 
windowSize, long slideInterval) {
-               return windowReduce(coWindowFunction, windowSize, slideInterval,
-                               (TimestampWrapper<IN1>) 
SystemTimestamp.getWrapper(),
-                               (TimestampWrapper<IN2>) 
SystemTimestamp.getWrapper());
-       }
-
-       /**
-        * Applies a CoWindow transformation on the connected DataStreams. The
-        * transformation calls the {@link CoWindowFunction#coWindow} method for
-        * time aligned windows of the two data streams. The user can implement
-        * their own time stamps or use the system time by default.
-        * 
-        * @param coWindowFunction
-        *            The {@link CoWindowFunction} that will be applied for the 
time
-        *            windows.
-        * @param windowSize
-        *            Size of the windows that will be aligned for both 
streams. If
-        *            system time is used it is milliseconds. User defined time
-        *            stamps are assumed to be monotonically increasing.
-        * @param slideInterval
-        *            After every function call the windows will be slid by this
-        *            interval.
-        * 
-        * @param timestamp1
-        *            User defined time stamps for the first input.
-        * @param timestamp2
-        *            User defined time stamps for the second input.
-        * @return The transformed {@link DataStream}.
-        */
-       public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
-                       CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long 
windowSize, long slideInterval,
-                       TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> 
timestamp2) {
-
-               if (windowSize < 1) {
-                       throw new IllegalArgumentException("Window size must be 
positive");
-               }
-               if (slideInterval < 1) {
-                       throw new IllegalArgumentException("Slide interval must 
be positive");
-               }
-
-               TypeInformation<OUT> outTypeInfo = 
TypeExtractor.createTypeInfo(CoWindowFunction.class,
-                               coWindowFunction.getClass(), 2, null, null);
-
-               return addCoFunction("Co-Window", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
-                               clean(coWindowFunction), windowSize, 
slideInterval, timestamp1, timestamp2));
-
-       }
-
-       protected <OUT> CoInvokable<IN1, IN2, OUT> getReduceInvokable(
-                       CoReduceFunction<IN1, IN2, OUT> coReducer) {
-               CoReduceInvokable<IN1, IN2, OUT> invokable;
-               if (isGrouped) {
-                       invokable = new CoGroupedReduceInvokable<IN1, IN2, 
OUT>(clean(coReducer), keySelector1,
-                                       keySelector2);
-               } else {
-                       invokable = new CoReduceInvokable<IN1, IN2, 
OUT>(clean(coReducer));
-               }
-               return invokable;
-       }
-
-       public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
-                       CoWindowFunction<IN1, IN2, OUT> coWindowFunction, 
TypeInformation<OUT> outTypeInfo,
-                       long windowSize, long slideInterval, 
TimestampWrapper<IN1> timestamp1,
-                       TimestampWrapper<IN2> timestamp2) {
-
-               if (windowSize < 1) {
-                       throw new IllegalArgumentException("Window size must be 
positive");
-               }
-               if (slideInterval < 1) {
-                       throw new IllegalArgumentException("Slide interval must 
be positive");
-               }
-
-               return addCoFunction("Co-Window", outTypeInfo, new 
CoWindowInvokable<IN1, IN2, OUT>(
-                               clean(coWindowFunction), windowSize, 
slideInterval, timestamp1, timestamp2));
-
-       }
-
-       public <OUT> SingleOutputStreamOperator<OUT, ?> addCoFunction(String 
functionName,
-                       TypeInformation<OUT> outTypeInfo, CoInvokable<IN1, IN2, 
OUT> functionInvokable) {
-
-               @SuppressWarnings({ "unchecked", "rawtypes" })
-               SingleOutputStreamOperator<OUT, ?> returnStream = new 
SingleOutputStreamOperator(
-                               environment, functionName, outTypeInfo, 
functionInvokable);
-
-               dataStream1.streamGraph.addCoTask(returnStream.getId(), 
functionInvokable,
-                               getInputType1(), getInputType2(), outTypeInfo, 
functionName,
-                               environment.getDegreeOfParallelism());
-
-               dataStream1.connectGraph(dataStream1, returnStream.getId(), 1);
-               dataStream1.connectGraph(dataStream2, returnStream.getId(), 2);
-
-               return returnStream;
-       }
-
-       protected ConnectedDataStream<IN1, IN2> copy() {
-               return new ConnectedDataStream<IN1, IN2>(this);
-       }
-
-}

Reply via email to