[hotfix] Cleanup routing of records in OperatorChain

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/28c6254e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/28c6254e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/28c6254e

Branch: refs/heads/tableOnCalcite
Commit: 28c6254ee385fe746e868a81b2207bf66b552174
Parents: e9c83ea
Author: Stephan Ewen <se...@apache.org>
Authored: Mon Feb 8 16:14:00 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Mon Feb 8 20:36:35 2016 +0100

----------------------------------------------------------------------
 .../BroadcastOutputSelectorWrapper.java         |  45 -------
 .../api/collector/selector/DirectedOutput.java  | 130 +++++++++++++++++++
 .../selector/DirectedOutputSelectorWrapper.java |  97 --------------
 .../selector/OutputSelectorWrapper.java         |   9 +-
 .../selector/OutputSelectorWrapperFactory.java  |  33 -----
 .../flink/streaming/api/graph/StreamConfig.java |  20 +--
 .../flink/streaming/api/graph/StreamNode.java   |  10 +-
 .../api/graph/StreamingJobGraphGenerator.java   |   2 +-
 .../streaming/runtime/io/CollectorWrapper.java  |  61 ---------
 .../streaming/runtime/tasks/OperatorChain.java  |  84 ++++++++++--
 .../flink/streaming/api/OutputSplitterTest.java |   2 +-
 .../runtime/tasks/StreamTaskTestHarness.java    |   9 +-
 12 files changed, 225 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
deleted file mode 100644
index 7034b11..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/BroadcastOutputSelectorWrapper.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-public class BroadcastOutputSelectorWrapper<OUT> implements 
OutputSelectorWrapper<OUT> {
-
-       private static final long serialVersionUID = 1L;
-       
-       private final ArrayList<Collector<StreamRecord<OUT>>> outputs;
-
-       public BroadcastOutputSelectorWrapper() {
-               outputs = new ArrayList<Collector<StreamRecord<OUT>>>();
-       }
-       
-       @Override
-       public void addCollector(Collector<StreamRecord<OUT>> output, 
StreamEdge edge) {
-               outputs.add(output);
-       }
-
-       @Override
-       public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT 
record) {
-               return outputs;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
new file mode 100644
index 0000000..52c50b3
--- /dev/null
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutput.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.collector.selector;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+
+public class DirectedOutput<OUT> implements Output<StreamRecord<OUT>> {
+       
+       private final OutputSelector<OUT>[] outputSelectors;
+
+       private final Output<StreamRecord<OUT>>[] selectAllOutputs;
+       
+       private final HashMap<String, Output<StreamRecord<OUT>>[]> outputMap;
+       
+       private final Output<StreamRecord<OUT>>[] allOutputs;
+
+       
+       @SuppressWarnings({"unchecked", "rawtypes"})
+       public DirectedOutput(
+                       List<OutputSelector<OUT>> outputSelectors,
+                       List<Tuple2<Output<StreamRecord<OUT>>, StreamEdge>> 
outputs)
+       {
+               this.outputSelectors = outputSelectors.toArray(new 
OutputSelector[outputSelectors.size()]);
+
+               this.allOutputs = new Output[outputs.size()];
+               for (int i = 0; i < outputs.size(); i++) {
+                       allOutputs[i] = outputs.get(i).f0;
+               }
+               
+               
+               HashSet<Output<StreamRecord<OUT>>> selectAllOutputs = new 
HashSet<Output<StreamRecord<OUT>>>();
+               HashMap<String, ArrayList<Output<StreamRecord<OUT>>>> outputMap 
= new HashMap<String, ArrayList<Output<StreamRecord<OUT>>>>();
+               
+               for (Tuple2<Output<StreamRecord<OUT>>, StreamEdge> outputPair : 
outputs) {
+                       final Output<StreamRecord<OUT>> output = outputPair.f0;
+                       final StreamEdge edge = outputPair.f1;
+       
+                       List<String> selectedNames = edge.getSelectedNames();
+
+                       if (selectedNames.isEmpty()) {
+                               selectAllOutputs.add(output);
+                       }
+                       else {
+                               for (String selectedName : selectedNames) {
+                                       if 
(!outputMap.containsKey(selectedName)) {
+                                               outputMap.put(selectedName, new 
ArrayList<Output<StreamRecord<OUT>>>());
+                                               
outputMap.get(selectedName).add(output);
+                                       }
+                                       else {
+                                               if 
(!outputMap.get(selectedName).contains(output)) {
+                                                       
outputMap.get(selectedName).add(output);
+                                               }
+                                       }
+                               }
+                       }
+               }
+               
+               this.selectAllOutputs = selectAllOutputs.toArray(new 
Output[selectAllOutputs.size()]);
+               
+               this.outputMap = new HashMap<>();
+               for (Map.Entry<String, ArrayList<Output<StreamRecord<OUT>>>> 
entry : outputMap.entrySet()) {
+                       Output<StreamRecord<OUT>>[] arr = 
entry.getValue().toArray(new Output[entry.getValue().size()]);
+                       this.outputMap.put(entry.getKey(), arr);
+               }
+       }
+
+
+       @Override
+       public void emitWatermark(Watermark mark) {
+               for (Output<StreamRecord<OUT>> out : allOutputs) {
+                       out.emitWatermark(mark);
+               }
+       }
+
+       @Override
+       public void collect(StreamRecord<OUT> record) {
+               Set<Output<StreamRecord<OUT>>> selectedOutputs = new 
HashSet<Output<StreamRecord<OUT>>>(selectAllOutputs.length);
+               Collections.addAll(selectedOutputs, selectAllOutputs);
+
+               for (OutputSelector<OUT> outputSelector : outputSelectors) {
+                       Iterable<String> outputNames = 
outputSelector.select(record.getValue());
+
+                       for (String outputName : outputNames) {
+                               Output<StreamRecord<OUT>>[] outputList = 
outputMap.get(outputName);
+                               if (outputList != null) {
+                                       Collections.addAll(selectedOutputs, 
outputList);
+                               }
+                       }
+               }
+               
+               for (Output<StreamRecord<OUT>> out : selectedOutputs) {
+                       out.collect(record);
+               }
+       }
+
+       @Override
+       public void close() {
+               for (Output<StreamRecord<OUT>> out : allOutputs) {
+                       out.close();
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
deleted file mode 100644
index 84558fc..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/DirectedOutputSelectorWrapper.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DirectedOutputSelectorWrapper<OUT> implements 
OutputSelectorWrapper<OUT> {
-
-       private static final long serialVersionUID = 1L;
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(DirectedOutputSelectorWrapper.class);
-
-       private List<OutputSelector<OUT>> outputSelectors;
-
-       private HashMap<String, ArrayList<Collector<StreamRecord<OUT>>>> 
outputMap;
-       private HashSet<Collector<StreamRecord<OUT>>> selectAllOutputs;
-
-       public DirectedOutputSelectorWrapper(List<OutputSelector<OUT>> 
outputSelectors) {
-               this.outputSelectors = outputSelectors;
-               this.selectAllOutputs = new 
HashSet<Collector<StreamRecord<OUT>>>();
-               this.outputMap = new HashMap<String, 
ArrayList<Collector<StreamRecord<OUT>>>>();
-       }
-       
-       @Override
-       public void addCollector(Collector<StreamRecord<OUT>> output, 
StreamEdge edge) {
-               List<String> selectedNames = edge.getSelectedNames();
-
-               if (selectedNames.isEmpty()) {
-                       selectAllOutputs.add(output);
-               }
-               else {
-                       for (String selectedName : selectedNames) {
-                               if (!outputMap.containsKey(selectedName)) {
-                                       outputMap.put(selectedName, new 
ArrayList<Collector<StreamRecord<OUT>>>());
-                                       outputMap.get(selectedName).add(output);
-                               }
-                               else {
-                                       if 
(!outputMap.get(selectedName).contains(output)) {
-                                               
outputMap.get(selectedName).add(output);
-                                       }
-                               }
-                       }
-               }
-       }
-
-       @Override
-       public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT 
record) {
-               Set<Collector<StreamRecord<OUT>>> selectedOutputs = new 
HashSet<Collector<StreamRecord<OUT>>>(selectAllOutputs);
-
-               for (OutputSelector<OUT> outputSelector : outputSelectors) {
-                       Iterable<String> outputNames = 
outputSelector.select(record);
-
-                       for (String outputName : outputNames) {
-                               List<Collector<StreamRecord<OUT>>> outputList = 
outputMap.get(outputName);
-
-                               try {
-                                       selectedOutputs.addAll(outputList);
-                               } catch (NullPointerException e) {
-                                       if (LOG.isErrorEnabled()) {
-                                               String format = String.format(
-                                                               "Cannot emit 
because no output is selected with the name: %s",
-                                                               outputName);
-                                               LOG.error(format);
-                                       }
-                               }
-                       }
-               }
-
-               return selectedOutputs;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
index f25c995..971e42b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapper.java
@@ -19,14 +19,7 @@ package org.apache.flink.streaming.api.collector.selector;
 
 import java.io.Serializable;
 
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
 public interface OutputSelectorWrapper<OUT> extends Serializable {
 
-       public void addCollector(Collector<StreamRecord<OUT>> output, 
StreamEdge edge);
-
-       public Iterable<Collector<StreamRecord<OUT>>> getSelectedOutputs(OUT 
record);
-
+       void sendOutputs(OUT record);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
deleted file mode 100644
index dca2ede..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/collector/selector/OutputSelectorWrapperFactory.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector.selector;
-
-import java.util.List;
-
-public class OutputSelectorWrapperFactory {
-
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       public static OutputSelectorWrapper<?> create(List<OutputSelector<?>> 
outputSelectors) {
-               if (outputSelectors.size() == 0) {
-                       return new BroadcastOutputSelectorWrapper();
-               } else {
-                       return new 
DirectedOutputSelectorWrapper(outputSelectors);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 7a07c79..311b7fb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.graph;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,7 +30,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.util.ClassLoaderUtil;
 import org.apache.flink.streaming.api.CheckpointingMode;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskException;
@@ -38,7 +39,7 @@ import org.apache.flink.util.InstantiationUtil;
 public class StreamConfig implements Serializable {
 
        private static final long serialVersionUID = 1L;
-
+       
        // 
------------------------------------------------------------------------
        //  Config Keys
        // 
------------------------------------------------------------------------
@@ -191,19 +192,22 @@ public class StreamConfig implements Serializable {
                }
        }
 
-       public void setOutputSelectorWrapper(OutputSelectorWrapper<?> 
outputSelectorWrapper) {
+       public void setOutputSelectors(List<OutputSelector<?>> outputSelectors) 
{
                try {
-                       
InstantiationUtil.writeObjectToConfig(outputSelectorWrapper, this.config, 
OUTPUT_SELECTOR_WRAPPER);
+                       InstantiationUtil.writeObjectToConfig(outputSelectors, 
this.config, OUTPUT_SELECTOR_WRAPPER);
                } catch (IOException e) {
-                       throw new StreamTaskException("Cannot serialize 
OutputSelectorWrapper.", e);
+                       throw new StreamTaskException("Could not serialize 
output selectors", e);
                }
        }
        
-       public <T> OutputSelectorWrapper<T> 
getOutputSelectorWrapper(ClassLoader cl) {
+       public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader 
userCodeClassloader) {
                try {
-                       return 
InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, 
cl);
+                       List<OutputSelector<T>> selectors = 
+                                       
InstantiationUtil.readObjectFromConfig(this.config, OUTPUT_SELECTOR_WRAPPER, 
userCodeClassloader);
+                       return selectors == null ? 
Collections.<OutputSelector<T>>emptyList() : selectors;
+                       
                } catch (Exception e) {
-                       throw new StreamTaskException("Cannot deserialize and 
instantiate OutputSelectorWrapper.", e);
+                       throw new StreamTaskException("Could not read output 
selectors", e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 0a612f3..3e06037 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -26,15 +26,11 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import 
org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapperFactory;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 
 /**
- * Class representing the operators in the streaming programs, with all their
- * properties.
- * 
+ * Class representing the operators in the streaming programs, with all their 
properties.
  */
 public class StreamNode implements Serializable {
 
@@ -168,10 +164,6 @@ public class StreamNode implements Serializable {
                return outputSelectors;
        }
 
-       public OutputSelectorWrapper<?> getOutputSelectorWrapper() {
-               return 
OutputSelectorWrapperFactory.create(getOutputSelectors());
-       }
-
        public void addOutputSelector(OutputSelector<?> outputSelector) {
                this.outputSelectors.add(outputSelector);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index c0d2856..c810e47 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -310,7 +310,7 @@ public class StreamingJobGraphGenerator {
                config.setTypeSerializerOut(vertex.getTypeSerializerOut());
 
                config.setStreamOperator(vertex.getOperator());
-               
config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
+               config.setOutputSelectors(vertex.getOutputSelectors());
 
                config.setNumberOfOutputs(nonChainableOutputs.size());
                config.setNonChainedOutputs(nonChainableOutputs);

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
deleted file mode 100644
index 01e997d..0000000
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CollectorWrapper.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.io;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-
-import java.util.ArrayList;
-
-public class CollectorWrapper<OUT> implements Output<StreamRecord<OUT>> {
-
-       private OutputSelectorWrapper<OUT> outputSelectorWrapper;
-
-       private ArrayList<Output<StreamRecord<OUT>>> allOutputs;
-
-       public CollectorWrapper(OutputSelectorWrapper<OUT> 
outputSelectorWrapper) {
-               this.outputSelectorWrapper = outputSelectorWrapper;
-               allOutputs = new ArrayList<Output<StreamRecord<OUT>>>();
-       }
-       
-       public void addCollector(Output<StreamRecord<OUT>> output, StreamEdge 
edge) {
-               outputSelectorWrapper.addCollector(output, edge);
-               allOutputs.add(output);
-       }
-
-       @Override
-       public void collect(StreamRecord<OUT> record) {
-               for (Collector<StreamRecord<OUT>> output : 
outputSelectorWrapper.getSelectedOutputs(record.getValue())) {
-                       output.collect(record);
-               }
-       }
-
-       @Override
-       public void emitWatermark(Watermark mark) {
-               for (Output<?> output : allOutputs) {
-                       output.emitWatermark(mark);
-               }
-       }
-
-       @Override
-       public void close() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 125279c..5313bc9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -24,15 +24,16 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.collector.selector.DirectedOutput;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.io.CollectorWrapper;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
-import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -45,6 +46,13 @@ import 
org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * The {@code OperatorChain} contains all operators that are executed as one 
chain within a single
+ * {@link StreamTask}.
+ * 
+ * @param <OUT> The type of elements accepted by the chain, i.e., the input 
type of the chain's
+ *              head operator.
+ */
 public class OperatorChain<OUT> {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(OperatorChain.class);
@@ -182,15 +190,14 @@ public class OperatorChain<OUT> {
                        Map<StreamEdge, RecordWriterOutput<?>> streamOutputs,
                        List<StreamOperator<?>> allOperators)
        {
-               // We create a wrapper that will encapsulate the chained 
operators and network outputs
-               OutputSelectorWrapper<T> outputSelectorWrapper = 
operatorConfig.getOutputSelectorWrapper(userCodeClassloader);
-               CollectorWrapper<T> wrapper = new 
CollectorWrapper<T>(outputSelectorWrapper);
-
+               List<Tuple2<Output<StreamRecord<T>>, StreamEdge>> allOutputs = 
new ArrayList<>(4);
+               
                // create collectors for the network outputs
                for (StreamEdge outputEdge : 
operatorConfig.getNonChainedOutputs(userCodeClassloader)) {
                        @SuppressWarnings("unchecked")
                        RecordWriterOutput<T> output = (RecordWriterOutput<T>) 
streamOutputs.get(outputEdge);
-                       wrapper.addCollector(output, outputEdge);
+                       
+                       allOutputs.add(new Tuple2<Output<StreamRecord<T>>, 
StreamEdge>(output, outputEdge));
                }
 
                // Create collectors for the chained outputs
@@ -200,9 +207,37 @@ public class OperatorChain<OUT> {
 
                        Output<StreamRecord<T>> output = createChainedOperator(
                                        containingTask, chainedOpConfig, 
chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
-                       wrapper.addCollector(output, outputEdge);
+                       
+                       allOutputs.add(new Tuple2<>(output, outputEdge));
+               }
+               
+               // if there are multiple outputs, or the outputs are directed, 
we need to
+               // wrap them as one output
+               
+               List<OutputSelector<T>> selectors = 
operatorConfig.getOutputSelectors(userCodeClassloader);
+               
+               if (selectors == null || selectors.isEmpty()) {
+                       // simple path, no selector necessary
+                       if (allOutputs.size() == 1) {
+                               return allOutputs.get(0).f0;
+                       }
+                       else {
+                               // send to N outputs. Note that this includes 
teh special case
+                               // of sending to zero outputs
+                               @SuppressWarnings({"unchecked", "rawtypes"})
+                               Output<StreamRecord<T>>[] asArray = new 
Output[allOutputs.size()];
+                               for (int i = 0; i < allOutputs.size(); i++) {
+                                       asArray[i] = allOutputs.get(i).f0;
+                               }
+                               
+                               return new 
BroadcastingOutputCollector<T>(asArray);
+                       }
+               }
+               else {
+                       // selector present, more complex routing necessary
+                       return new DirectedOutput<T>(selectors, allOutputs);
+                       
                }
-               return wrapper;
        }
        
        private static <IN, OUT> Output<StreamRecord<IN>> createChainedOperator(
@@ -309,7 +344,6 @@ public class OperatorChain<OUT> {
                @Override
                public void collect(StreamRecord<T> record) {
                        try {
-
                                StreamRecord<T> copy = new 
StreamRecord<>(serializer.copy(record.getValue()), record.getTimestamp());
 
                                operator.setKeyContextElement1(copy);
@@ -320,4 +354,34 @@ public class OperatorChain<OUT> {
                        }
                }
        }
+       
+       private static final class BroadcastingOutputCollector<T> implements 
Output<StreamRecord<T>> {
+               
+               private final Output<StreamRecord<T>>[] outputs;
+               
+               public BroadcastingOutputCollector(Output<StreamRecord<T>>[] 
outputs) {
+                       this.outputs = outputs;
+               }
+
+               @Override
+               public void emitWatermark(Watermark mark) {
+                       for (Output<StreamRecord<T>> output : outputs) {
+                               output.emitWatermark(mark);
+                       }
+               }
+
+               @Override
+               public void collect(StreamRecord<T> record) {
+                       for (Output<StreamRecord<T>> output : outputs) {
+                               output.collect(record);
+                       }
+               }
+
+               @Override
+               public void close() {
+                       for (Output<StreamRecord<T>> output : outputs) {
+                               output.close();
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
index 8525d37..5126d11 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/OutputSplitterTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+
 import org.junit.Test;
 
 public class OutputSplitterTest extends StreamingMultipleProgramsTestBase {

http://git-wip-us.apache.org/repos/asf/flink/blob/28c6254e/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 2cca3ff..e32b304 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -26,7 +27,6 @@ import 
org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleIn
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import 
org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -40,11 +40,11 @@ import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-
 /**
  * Test harness for testing a {@link StreamTask}.
  *
@@ -91,6 +91,7 @@ public class StreamTaskTestHarness<OUT> {
        // input related methods only need to be implemented once, in generic 
form
        protected int numInputGates;
        protected int numInputChannelsPerGate;
+       
        @SuppressWarnings("rawtypes")
        protected StreamTestSingleInputGate[] inputGates;
 
@@ -128,7 +129,7 @@ public class StreamTaskTestHarness<OUT> {
 
                mockEnv.addOutput(outputList, outputStreamRecordSerializer);
 
-               streamConfig.setOutputSelectorWrapper(new 
BroadcastOutputSelectorWrapper<Object>());
+               
streamConfig.setOutputSelectors(Collections.<OutputSelector<?>>emptyList());
                streamConfig.setNumberOfOutputs(1);
 
                StreamOperator<OUT> dummyOperator = new 
AbstractStreamOperator<OUT>() {

Reply via email to